refactor(cache): reorganize cache directory structure with automatic legacy cleanup

- Centralize cache path resolution in new py/utils/cache_paths.py module
- Migrate legacy cache files to organized structure: {settings_dir}/cache/{model|recipe|fts|symlink}/
- Automatically clean up legacy files after successful migration with integrity verification
- Update Config symlink cache to use new path and migrate from old location
- Simplify service classes (PersistentModelCache, PersistentRecipeCache, RecipeFTSIndex, TagFTSIndex) to use centralized migration logic
- Add comprehensive test coverage for cache paths and automatic cleanup
This commit is contained in:
Will Miao
2026-01-26 16:12:08 +08:00
parent e14afde4b3
commit 7b0c6c8bab
9 changed files with 1108 additions and 74 deletions

View File

@@ -9,6 +9,7 @@ import json
import urllib.parse
import time
from .utils.cache_paths import CacheType, get_cache_file_path, get_legacy_cache_paths
from .utils.settings_paths import ensure_settings_file, get_settings_dir, load_settings_template
# Use an environment variable to control standalone mode
@@ -227,9 +228,8 @@ class Config:
return os.path.normpath(path).replace(os.sep, '/')
def _get_symlink_cache_path(self) -> Path:
cache_dir = Path(get_settings_dir(create=True)) / "cache"
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir / "symlink_map.json"
canonical_path = get_cache_file_path(CacheType.SYMLINK, create_dir=True)
return Path(canonical_path)
def _symlink_roots(self) -> List[str]:
roots: List[str] = []
@@ -280,14 +280,28 @@ class Config:
def _load_persisted_cache_into_mappings(self) -> bool:
"""Load the symlink cache and store its fingerprint for comparison."""
cache_path = self._get_symlink_cache_path()
if not cache_path.exists():
return False
try:
with cache_path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
except Exception as exc:
logger.info("Failed to load symlink cache %s: %s", cache_path, exc)
# Check canonical path first, then legacy paths for migration
paths_to_check = [cache_path]
legacy_paths = get_legacy_cache_paths(CacheType.SYMLINK)
paths_to_check.extend(Path(p) for p in legacy_paths if p != str(cache_path))
loaded_path = None
payload = None
for check_path in paths_to_check:
if not check_path.exists():
continue
try:
with check_path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
loaded_path = check_path
break
except Exception as exc:
logger.info("Failed to load symlink cache %s: %s", check_path, exc)
continue
if payload is None:
return False
if not isinstance(payload, dict):
@@ -307,7 +321,37 @@ class Config:
normalized_mappings[self._normalize_path(target)] = self._normalize_path(link)
self._path_mappings = normalized_mappings
logger.info("Symlink cache loaded with %d mappings", len(self._path_mappings))
# Log migration if loaded from legacy path
if loaded_path is not None and loaded_path != cache_path:
logger.info(
"Symlink cache migrated from %s (will save to %s)",
loaded_path,
cache_path,
)
try:
if loaded_path.exists():
loaded_path.unlink()
logger.info("Cleaned up legacy symlink cache: %s", loaded_path)
try:
parent_dir = loaded_path.parent
if parent_dir.name == "cache" and not any(parent_dir.iterdir()):
parent_dir.rmdir()
logger.info("Removed empty legacy cache directory: %s", parent_dir)
except Exception:
pass
except Exception as exc:
logger.warning(
"Failed to cleanup legacy symlink cache %s: %s",
loaded_path,
exc,
)
else:
logger.info("Symlink cache loaded with %d mappings", len(self._path_mappings))
return True
def _save_symlink_cache(self) -> None:

View File

@@ -1,13 +1,12 @@
import json
import logging
import os
import re
import sqlite3
import threading
from dataclasses import dataclass
from typing import Dict, List, Mapping, Optional, Sequence, Tuple
from ..utils.settings_paths import get_project_root, get_settings_dir
from ..utils.cache_paths import CacheType, resolve_cache_path_with_migration
logger = logging.getLogger(__name__)
@@ -404,20 +403,12 @@ class PersistentModelCache:
# Internal helpers -------------------------------------------------
def _resolve_default_path(self, library_name: str) -> str:
override = os.environ.get("LORA_MANAGER_CACHE_DB")
if override:
return override
try:
settings_dir = get_settings_dir(create=True)
except Exception as exc: # pragma: no cover - defensive guard
logger.warning("Falling back to project directory for cache: %s", exc)
settings_dir = get_project_root()
safe_name = re.sub(r"[^A-Za-z0-9_.-]", "_", library_name or "default")
if safe_name.lower() in ("default", ""):
legacy_path = os.path.join(settings_dir, self._DEFAULT_FILENAME)
if os.path.exists(legacy_path):
return legacy_path
return os.path.join(settings_dir, "model_cache", f"{safe_name}.sqlite")
env_override = os.environ.get("LORA_MANAGER_CACHE_DB")
return resolve_cache_path_with_migration(
CacheType.MODEL,
library_name=library_name,
env_override=env_override,
)
def _initialize_schema(self) -> None:
with self._db_lock:

View File

@@ -10,13 +10,12 @@ from __future__ import annotations
import json
import logging
import os
import re
import sqlite3
import threading
from dataclasses import dataclass
from typing import Dict, List, Optional, Set, Tuple
from ..utils.settings_paths import get_project_root, get_settings_dir
from ..utils.cache_paths import CacheType, resolve_cache_path_with_migration
logger = logging.getLogger(__name__)
@@ -312,20 +311,12 @@ class PersistentRecipeCache:
# Internal helpers
def _resolve_default_path(self, library_name: str) -> str:
override = os.environ.get("LORA_MANAGER_RECIPE_CACHE_DB")
if override:
return override
try:
settings_dir = get_settings_dir(create=True)
except Exception as exc:
logger.warning("Falling back to project directory for recipe cache: %s", exc)
settings_dir = get_project_root()
safe_name = re.sub(r"[^A-Za-z0-9_.-]", "_", library_name or "default")
if safe_name.lower() in ("default", ""):
legacy_path = os.path.join(settings_dir, self._DEFAULT_FILENAME)
if os.path.exists(legacy_path):
return legacy_path
return os.path.join(settings_dir, "recipe_cache", f"{safe_name}.sqlite")
env_override = os.environ.get("LORA_MANAGER_RECIPE_CACHE_DB")
return resolve_cache_path_with_migration(
CacheType.RECIPE,
library_name=library_name,
env_override=env_override,
)
def _initialize_schema(self) -> None:
with self._db_lock:

View File

@@ -15,7 +15,7 @@ import threading
import time
from typing import Any, Dict, List, Optional, Set
from ..utils.settings_paths import get_settings_dir
from ..utils.cache_paths import CacheType, resolve_cache_path_with_migration
logger = logging.getLogger(__name__)
@@ -67,17 +67,11 @@ class RecipeFTSIndex:
def _resolve_default_path(self) -> str:
"""Resolve the default database path."""
override = os.environ.get("LORA_MANAGER_RECIPE_FTS_DB")
if override:
return override
try:
settings_dir = get_settings_dir(create=True)
except Exception as exc:
logger.warning("Falling back to current directory for FTS index: %s", exc)
settings_dir = "."
return os.path.join(settings_dir, self._DEFAULT_FILENAME)
env_override = os.environ.get("LORA_MANAGER_RECIPE_FTS_DB")
return resolve_cache_path_with_migration(
CacheType.RECIPE_FTS,
env_override=env_override,
)
def get_database_path(self) -> str:
"""Return the resolved database path."""

View File

@@ -16,7 +16,7 @@ import time
from pathlib import Path
from typing import Dict, List, Optional, Set
from ..utils.settings_paths import get_settings_dir
from ..utils.cache_paths import CacheType, resolve_cache_path_with_migration
logger = logging.getLogger(__name__)
@@ -89,17 +89,11 @@ class TagFTSIndex:
def _resolve_default_db_path(self) -> str:
"""Resolve the default database path."""
override = os.environ.get("LORA_MANAGER_TAG_FTS_DB")
if override:
return override
try:
settings_dir = get_settings_dir(create=True)
except Exception as exc:
logger.warning("Falling back to current directory for FTS index: %s", exc)
settings_dir = "."
return os.path.join(settings_dir, self._DEFAULT_FILENAME)
env_override = os.environ.get("LORA_MANAGER_TAG_FTS_DB")
return resolve_cache_path_with_migration(
CacheType.TAG_FTS,
env_override=env_override,
)
def _resolve_default_csv_path(self) -> str:
"""Resolve the default CSV file path."""

421
py/utils/cache_paths.py Normal file
View File

@@ -0,0 +1,421 @@
"""Centralized cache path resolution with automatic migration support.
This module provides a unified interface for resolving cache file paths,
with automatic migration from legacy locations to the new organized
cache directory structure.
Target structure:
{settings_dir}/
└── cache/
├── symlink/
│ └── symlink_map.json
├── model/
│ └── {library_name}.sqlite
├── recipe/
│ └── {library_name}.sqlite
└── fts/
├── recipe_fts.sqlite
└── tag_fts.sqlite
"""
from __future__ import annotations
import logging
import os
import re
import shutil
from enum import Enum
from typing import List, Optional
from .settings_paths import get_project_root, get_settings_dir
logger = logging.getLogger(__name__)
class CacheType(Enum):
"""Types of cache files managed by the cache path resolver."""
MODEL = "model"
RECIPE = "recipe"
RECIPE_FTS = "recipe_fts"
TAG_FTS = "tag_fts"
SYMLINK = "symlink"
# Subdirectory structure for each cache type
_CACHE_SUBDIRS = {
CacheType.MODEL: "model",
CacheType.RECIPE: "recipe",
CacheType.RECIPE_FTS: "fts",
CacheType.TAG_FTS: "fts",
CacheType.SYMLINK: "symlink",
}
# Filename patterns for each cache type
_CACHE_FILENAMES = {
CacheType.MODEL: "{library_name}.sqlite",
CacheType.RECIPE: "{library_name}.sqlite",
CacheType.RECIPE_FTS: "recipe_fts.sqlite",
CacheType.TAG_FTS: "tag_fts.sqlite",
CacheType.SYMLINK: "symlink_map.json",
}
def get_cache_base_dir(create: bool = True) -> str:
"""Return the base cache directory path.
Args:
create: Whether to create the directory if it does not exist.
Returns:
The absolute path to the cache base directory ({settings_dir}/cache/).
"""
settings_dir = get_settings_dir(create=create)
cache_dir = os.path.join(settings_dir, "cache")
if create:
os.makedirs(cache_dir, exist_ok=True)
return cache_dir
def _sanitize_library_name(library_name: Optional[str]) -> str:
"""Sanitize a library name for use in filenames.
Args:
library_name: The library name to sanitize.
Returns:
A sanitized version safe for use in filenames.
"""
name = library_name or "default"
return re.sub(r"[^A-Za-z0-9_.-]", "_", name)
def get_cache_file_path(
cache_type: CacheType,
library_name: Optional[str] = None,
create_dir: bool = True,
) -> str:
"""Get the canonical path for a cache file.
Args:
cache_type: The type of cache file.
library_name: The library name (only used for MODEL and RECIPE types).
create_dir: Whether to create the parent directory if it does not exist.
Returns:
The absolute path to the cache file in its canonical location.
"""
cache_base = get_cache_base_dir(create=create_dir)
subdir = _CACHE_SUBDIRS[cache_type]
cache_dir = os.path.join(cache_base, subdir)
if create_dir:
os.makedirs(cache_dir, exist_ok=True)
filename_template = _CACHE_FILENAMES[cache_type]
safe_name = _sanitize_library_name(library_name)
filename = filename_template.format(library_name=safe_name)
return os.path.join(cache_dir, filename)
def get_legacy_cache_paths(
cache_type: CacheType,
library_name: Optional[str] = None,
) -> List[str]:
"""Get a list of legacy cache file paths to check for migration.
The paths are returned in order of priority (most recent first).
Args:
cache_type: The type of cache file.
library_name: The library name (only used for MODEL and RECIPE types).
Returns:
A list of potential legacy paths to check, in order of preference.
"""
try:
settings_dir = get_settings_dir(create=False)
except Exception:
settings_dir = get_project_root()
safe_name = _sanitize_library_name(library_name)
legacy_paths: List[str] = []
if cache_type == CacheType.MODEL:
# Legacy per-library path: {settings_dir}/model_cache/{library}.sqlite
legacy_paths.append(
os.path.join(settings_dir, "model_cache", f"{safe_name}.sqlite")
)
# Legacy root-level single cache (for "default" library only)
if safe_name.lower() in ("default", ""):
legacy_paths.append(os.path.join(settings_dir, "model_cache.sqlite"))
elif cache_type == CacheType.RECIPE:
# Legacy per-library path: {settings_dir}/recipe_cache/{library}.sqlite
legacy_paths.append(
os.path.join(settings_dir, "recipe_cache", f"{safe_name}.sqlite")
)
# Legacy root-level single cache (for "default" library only)
if safe_name.lower() in ("default", ""):
legacy_paths.append(os.path.join(settings_dir, "recipe_cache.sqlite"))
elif cache_type == CacheType.RECIPE_FTS:
# Legacy root-level path
legacy_paths.append(os.path.join(settings_dir, "recipe_fts.sqlite"))
elif cache_type == CacheType.TAG_FTS:
# Legacy root-level path
legacy_paths.append(os.path.join(settings_dir, "tag_fts.sqlite"))
elif cache_type == CacheType.SYMLINK:
# Current location in cache/ but without subdirectory
legacy_paths.append(
os.path.join(settings_dir, "cache", "symlink_map.json")
)
return legacy_paths
def _cleanup_legacy_file_after_migration(
legacy_path: str,
canonical_path: str,
) -> bool:
"""Safely remove a legacy file after successful migration.
Args:
legacy_path: The legacy file path to remove.
canonical_path: The canonical path where the file was copied to.
Returns:
True if cleanup succeeded, False otherwise.
"""
try:
if not os.path.exists(canonical_path):
logger.warning(
"Skipping cleanup of %s: canonical file not found at %s",
legacy_path,
canonical_path,
)
return False
legacy_size = os.path.getsize(legacy_path)
canonical_size = os.path.getsize(canonical_path)
if legacy_size != canonical_size:
logger.warning(
"Skipping cleanup of %s: file size mismatch (legacy=%d, canonical=%d)",
legacy_path,
legacy_size,
canonical_size,
)
return False
os.remove(legacy_path)
logger.info("Cleaned up legacy cache file: %s", legacy_path)
_cleanup_empty_legacy_directories(legacy_path)
return True
except Exception as exc:
logger.warning(
"Failed to cleanup legacy cache file %s: %s",
legacy_path,
exc,
)
return False
def _cleanup_empty_legacy_directories(legacy_path: str) -> None:
"""Remove empty parent directories of a legacy file.
This function only removes directories if they are empty,
using os.rmdir() which fails on non-empty directories.
Args:
legacy_path: The legacy file path whose parent directories should be cleaned.
"""
try:
parent_dir = os.path.dirname(legacy_path)
legacy_dir_names = ("model_cache", "recipe_cache")
current = parent_dir
while current:
base_name = os.path.basename(current)
if base_name in legacy_dir_names:
if os.path.isdir(current) and not os.listdir(current):
try:
os.rmdir(current)
logger.info("Removed empty legacy directory: %s", current)
except Exception:
pass
parent = os.path.dirname(current)
if parent == current:
break
current = parent
except Exception as exc:
logger.debug("Failed to cleanup empty legacy directories: %s", exc)
def resolve_cache_path_with_migration(
cache_type: CacheType,
library_name: Optional[str] = None,
env_override: Optional[str] = None,
) -> str:
"""Resolve the cache file path, migrating from legacy locations if needed.
This function performs lazy migration: on first access, it checks if the
file exists at the canonical location. If not, it looks for legacy files
and copies them to the new location. After successful migration, the
legacy file is automatically removed.
Args:
cache_type: The type of cache file.
library_name: The library name (only used for MODEL and RECIPE types).
env_override: Optional environment variable value that overrides all
path resolution. When set, returns this path directly without
any migration.
Returns:
The resolved path to use for the cache file.
"""
# Environment override bypasses all migration logic
if env_override:
return env_override
canonical_path = get_cache_file_path(cache_type, library_name, create_dir=True)
# If file already exists at canonical location, use it
if os.path.exists(canonical_path):
return canonical_path
# Check legacy paths for migration
legacy_paths = get_legacy_cache_paths(cache_type, library_name)
for legacy_path in legacy_paths:
if os.path.exists(legacy_path):
try:
shutil.copy2(legacy_path, canonical_path)
logger.info(
"Migrated %s cache from %s to %s",
cache_type.value,
legacy_path,
canonical_path,
)
_cleanup_legacy_file_after_migration(legacy_path, canonical_path)
return canonical_path
except Exception as exc:
logger.warning(
"Failed to migrate %s cache from %s: %s",
cache_type.value,
legacy_path,
exc,
)
# No legacy file found; return canonical path (will be created fresh)
return canonical_path
def get_legacy_cache_files_for_cleanup() -> List[str]:
"""Get a list of legacy cache files that can be removed after migration.
This function returns files that exist in legacy locations and have
corresponding files in the new canonical locations.
Returns:
A list of legacy file paths that are safe to remove.
"""
files_to_remove: List[str] = []
try:
settings_dir = get_settings_dir(create=False)
except Exception:
return files_to_remove
# Check each cache type for migrated legacy files
for cache_type in CacheType:
# For MODEL and RECIPE, we need to check each library
if cache_type in (CacheType.MODEL, CacheType.RECIPE):
# Check default library
_check_legacy_for_cleanup(cache_type, "default", files_to_remove)
# Check for any per-library caches in legacy directories
legacy_dir_name = "model_cache" if cache_type == CacheType.MODEL else "recipe_cache"
legacy_dir = os.path.join(settings_dir, legacy_dir_name)
if os.path.isdir(legacy_dir):
try:
for filename in os.listdir(legacy_dir):
if filename.endswith(".sqlite"):
library_name = filename[:-7] # Remove .sqlite
_check_legacy_for_cleanup(cache_type, library_name, files_to_remove)
except Exception:
pass
else:
_check_legacy_for_cleanup(cache_type, None, files_to_remove)
return files_to_remove
def _check_legacy_for_cleanup(
cache_type: CacheType,
library_name: Optional[str],
files_to_remove: List[str],
) -> None:
"""Check if a legacy cache file can be removed after migration.
Args:
cache_type: The type of cache file.
library_name: The library name (only used for MODEL and RECIPE types).
files_to_remove: List to append removable files to.
"""
canonical_path = get_cache_file_path(cache_type, library_name, create_dir=False)
if not os.path.exists(canonical_path):
return
legacy_paths = get_legacy_cache_paths(cache_type, library_name)
for legacy_path in legacy_paths:
if os.path.exists(legacy_path) and legacy_path not in files_to_remove:
files_to_remove.append(legacy_path)
def cleanup_legacy_cache_files(dry_run: bool = True) -> List[str]:
"""Remove legacy cache files that have been migrated.
Args:
dry_run: If True, only return the list of files that would be removed
without actually removing them.
Returns:
A list of files that were (or would be) removed.
"""
files = get_legacy_cache_files_for_cleanup()
if dry_run or not files:
return files
removed: List[str] = []
for file_path in files:
try:
os.remove(file_path)
removed.append(file_path)
logger.info("Removed legacy cache file: %s", file_path)
except Exception as exc:
logger.warning("Failed to remove legacy cache file %s: %s", file_path, exc)
# Try to remove empty legacy directories
try:
settings_dir = get_settings_dir(create=False)
for legacy_dir_name in ("model_cache", "recipe_cache"):
legacy_dir = os.path.join(settings_dir, legacy_dir_name)
if os.path.isdir(legacy_dir) and not os.listdir(legacy_dir):
os.rmdir(legacy_dir)
logger.info("Removed empty legacy directory: %s", legacy_dir)
except Exception:
pass
return removed

View File

@@ -4,6 +4,7 @@ import os
import pytest
from py import config as config_module
from py.utils import cache_paths as cache_paths_module
def _normalize(path: str) -> str:
@@ -28,9 +29,14 @@ def _setup_paths(monkeypatch: pytest.MonkeyPatch, tmp_path):
}
return mapping.get(kind, [])
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths)
monkeypatch.setattr(config_module, "standalone_mode", True)
monkeypatch.setattr(config_module, "get_settings_dir", lambda create=True: str(settings_dir))
monkeypatch.setattr(config_module, "get_settings_dir", fake_get_settings_dir)
# Also patch cache_paths module which has its own import of get_settings_dir
monkeypatch.setattr(cache_paths_module, "get_settings_dir", fake_get_settings_dir)
return loras_dir, settings_dir
@@ -57,7 +63,7 @@ def test_symlink_scan_skips_file_links(monkeypatch: pytest.MonkeyPatch, tmp_path
normalized_file_real = _normalize(os.path.realpath(file_target))
assert normalized_file_real not in cfg._path_mappings
cache_path = settings_dir / "cache" / "symlink_map.json"
cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json"
assert cache_path.exists()
@@ -71,7 +77,7 @@ def test_symlink_cache_reuses_previous_scan(monkeypatch: pytest.MonkeyPatch, tmp
first_cfg = config_module.Config()
cached_mappings = dict(first_cfg._path_mappings)
cache_path = settings_dir / "cache" / "symlink_map.json"
cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json"
assert cache_path.exists()
def fail_scan(self):
@@ -97,7 +103,7 @@ def test_symlink_cache_survives_noise_mtime(monkeypatch: pytest.MonkeyPatch, tmp
noise_file = recipes_dir / "touchme.txt"
first_cfg = config_module.Config()
cache_path = settings_dir / "cache" / "symlink_map.json"
cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json"
assert cache_path.exists()
# Update a noisy path to bump parent directory mtime
@@ -164,9 +170,14 @@ def test_symlink_roots_are_preserved(monkeypatch: pytest.MonkeyPatch, tmp_path):
}
return mapping.get(kind, [])
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths)
monkeypatch.setattr(config_module, "standalone_mode", True)
monkeypatch.setattr(config_module, "get_settings_dir", lambda create=True: str(settings_dir))
monkeypatch.setattr(config_module, "get_settings_dir", fake_get_settings_dir)
# Also patch cache_paths module which has its own import of get_settings_dir
monkeypatch.setattr(cache_paths_module, "get_settings_dir", fake_get_settings_dir)
cfg = config_module.Config()
@@ -174,6 +185,65 @@ def test_symlink_roots_are_preserved(monkeypatch: pytest.MonkeyPatch, tmp_path):
normalized_link = _normalize(str(loras_link))
assert cfg._path_mappings[normalized_real] == normalized_link
cache_path = settings_dir / "cache" / "symlink_map.json"
cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json"
payload = json.loads(cache_path.read_text(encoding="utf-8"))
assert payload["path_mappings"][normalized_real] == normalized_link
def test_legacy_symlink_cache_automatic_cleanup(monkeypatch: pytest.MonkeyPatch, tmp_path):
"""Test that legacy symlink cache is automatically cleaned up after migration."""
settings_dir = tmp_path / "settings"
loras_dir = tmp_path / "loras"
loras_dir.mkdir()
checkpoint_dir = tmp_path / "checkpoints"
checkpoint_dir.mkdir()
embedding_dir = tmp_path / "embeddings"
embedding_dir.mkdir()
def fake_get_folder_paths(kind: str):
mapping = {
"loras": [str(loras_dir)],
"checkpoints": [str(checkpoint_dir)],
"unet": [],
"embeddings": [str(embedding_dir)],
}
return mapping.get(kind, [])
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths)
monkeypatch.setattr(config_module, "standalone_mode", True)
monkeypatch.setattr(config_module, "get_settings_dir", fake_get_settings_dir)
monkeypatch.setattr(cache_paths_module, "get_settings_dir", fake_get_settings_dir)
# Create legacy symlink cache at old location
settings_dir.mkdir(parents=True, exist_ok=True)
legacy_cache_dir = settings_dir / "cache"
legacy_cache_dir.mkdir(exist_ok=True)
legacy_cache_path = legacy_cache_dir / "symlink_map.json"
# Write some legacy cache data
legacy_data = {
"fingerprint": {"roots": []},
"path_mappings": {
"/legacy/target": "/legacy/link"
}
}
legacy_cache_path.write_text(json.dumps(legacy_data), encoding="utf-8")
# Verify legacy file exists
assert legacy_cache_path.exists()
# Initialize Config - this should trigger migration and automatic cleanup
cfg = config_module.Config()
# New canonical cache should exist
new_cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json"
assert new_cache_path.exists()
# Legacy file should be automatically cleaned up
assert not legacy_cache_path.exists()
# Config should still work correctly
assert isinstance(cfg._path_mappings, dict)

View File

@@ -37,7 +37,7 @@ def test_portable_settings_use_project_root(tmp_path, monkeypatch):
monkeypatch.delenv("LORA_MANAGER_CACHE_DB", raising=False)
cache = cache_module.PersistentModelCache(library_name="portable_lib")
expected_cache_path = tmp_path / "model_cache" / "portable_lib.sqlite"
expected_cache_path = tmp_path / "cache" / "model" / "portable_lib.sqlite"
assert cache.get_database_path() == str(expected_cache_path)
assert expected_cache_path.parent.is_dir()

View File

@@ -0,0 +1,529 @@
"""Unit tests for the cache_paths module."""
import os
import shutil
import tempfile
from pathlib import Path
import pytest
from py.utils.cache_paths import (
CacheType,
cleanup_legacy_cache_files,
get_cache_base_dir,
get_cache_file_path,
get_legacy_cache_files_for_cleanup,
get_legacy_cache_paths,
resolve_cache_path_with_migration,
)
class TestCacheType:
"""Tests for the CacheType enum."""
def test_enum_values(self):
assert CacheType.MODEL.value == "model"
assert CacheType.RECIPE.value == "recipe"
assert CacheType.RECIPE_FTS.value == "recipe_fts"
assert CacheType.TAG_FTS.value == "tag_fts"
assert CacheType.SYMLINK.value == "symlink"
class TestGetCacheBaseDir:
"""Tests for get_cache_base_dir function."""
def test_returns_cache_subdirectory(self):
cache_dir = get_cache_base_dir(create=True)
assert cache_dir.endswith("cache")
assert os.path.isdir(cache_dir)
def test_creates_directory_when_requested(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
cache_dir = get_cache_base_dir(create=True)
assert os.path.isdir(cache_dir)
assert cache_dir == str(settings_dir / "cache")
class TestGetCacheFilePath:
"""Tests for get_cache_file_path function."""
def test_model_cache_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.MODEL, "my_library", create_dir=True)
expected = settings_dir / "cache" / "model" / "my_library.sqlite"
assert path == str(expected)
assert os.path.isdir(expected.parent)
def test_recipe_cache_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.RECIPE, "default", create_dir=True)
expected = settings_dir / "cache" / "recipe" / "default.sqlite"
assert path == str(expected)
def test_recipe_fts_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.RECIPE_FTS, create_dir=True)
expected = settings_dir / "cache" / "fts" / "recipe_fts.sqlite"
assert path == str(expected)
def test_tag_fts_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.TAG_FTS, create_dir=True)
expected = settings_dir / "cache" / "fts" / "tag_fts.sqlite"
assert path == str(expected)
def test_symlink_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.SYMLINK, create_dir=True)
expected = settings_dir / "cache" / "symlink" / "symlink_map.json"
assert path == str(expected)
def test_sanitizes_library_name(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.MODEL, "my/bad:name", create_dir=True)
assert "my_bad_name" in path
def test_none_library_name_defaults_to_default(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = get_cache_file_path(CacheType.MODEL, None, create_dir=True)
assert "default.sqlite" in path
class TestGetLegacyCachePaths:
"""Tests for get_legacy_cache_paths function."""
def test_model_legacy_paths_for_default(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
paths = get_legacy_cache_paths(CacheType.MODEL, "default")
assert len(paths) == 2
assert str(settings_dir / "model_cache" / "default.sqlite") in paths
assert str(settings_dir / "model_cache.sqlite") in paths
def test_model_legacy_paths_for_named_library(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
paths = get_legacy_cache_paths(CacheType.MODEL, "my_library")
assert len(paths) == 1
assert str(settings_dir / "model_cache" / "my_library.sqlite") in paths
def test_recipe_legacy_paths(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
paths = get_legacy_cache_paths(CacheType.RECIPE, "default")
assert len(paths) == 2
assert str(settings_dir / "recipe_cache" / "default.sqlite") in paths
assert str(settings_dir / "recipe_cache.sqlite") in paths
def test_recipe_fts_legacy_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
paths = get_legacy_cache_paths(CacheType.RECIPE_FTS)
assert len(paths) == 1
assert str(settings_dir / "recipe_fts.sqlite") in paths
def test_tag_fts_legacy_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
paths = get_legacy_cache_paths(CacheType.TAG_FTS)
assert len(paths) == 1
assert str(settings_dir / "tag_fts.sqlite") in paths
def test_symlink_legacy_path(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
paths = get_legacy_cache_paths(CacheType.SYMLINK)
assert len(paths) == 1
assert str(settings_dir / "cache" / "symlink_map.json") in paths
class TestResolveCachePathWithMigration:
"""Tests for resolve_cache_path_with_migration function."""
def test_returns_env_override_when_set(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
override_path = "/custom/path/cache.sqlite"
path = resolve_cache_path_with_migration(
CacheType.MODEL,
library_name="default",
env_override=override_path,
)
assert path == override_path
def test_returns_canonical_path_when_exists(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create the canonical path
canonical = settings_dir / "cache" / "model" / "default.sqlite"
canonical.parent.mkdir(parents=True)
canonical.write_text("existing")
path = resolve_cache_path_with_migration(CacheType.MODEL, "default")
assert path == str(canonical)
def test_migrates_from_legacy_root_level_cache(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create legacy cache at root level
legacy_path = settings_dir / "model_cache.sqlite"
legacy_path.write_text("legacy data")
path = resolve_cache_path_with_migration(CacheType.MODEL, "default")
# Should return canonical path
canonical = settings_dir / "cache" / "model" / "default.sqlite"
assert path == str(canonical)
# File should be copied to canonical location
assert canonical.exists()
assert canonical.read_text() == "legacy data"
# Legacy file should be automatically cleaned up
assert not legacy_path.exists()
def test_migrates_from_legacy_per_library_cache(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create legacy per-library cache
legacy_dir = settings_dir / "model_cache"
legacy_dir.mkdir()
legacy_path = legacy_dir / "my_library.sqlite"
legacy_path.write_text("legacy library data")
path = resolve_cache_path_with_migration(CacheType.MODEL, "my_library")
# Should return canonical path
canonical = settings_dir / "cache" / "model" / "my_library.sqlite"
assert path == str(canonical)
assert canonical.exists()
assert canonical.read_text() == "legacy library data"
# Legacy file should be automatically cleaned up
assert not legacy_path.exists()
# Empty legacy directory should be cleaned up
assert not legacy_dir.exists()
def test_prefers_per_library_over_root_for_migration(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create both legacy caches
legacy_root = settings_dir / "model_cache.sqlite"
legacy_root.write_text("root legacy")
legacy_dir = settings_dir / "model_cache"
legacy_dir.mkdir()
legacy_lib = legacy_dir / "default.sqlite"
legacy_lib.write_text("library legacy")
path = resolve_cache_path_with_migration(CacheType.MODEL, "default")
canonical = settings_dir / "cache" / "model" / "default.sqlite"
assert path == str(canonical)
# Should migrate from per-library path (first in legacy list)
assert canonical.read_text() == "library legacy"
def test_returns_canonical_path_when_no_legacy_exists(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
path = resolve_cache_path_with_migration(CacheType.MODEL, "new_library")
canonical = settings_dir / "cache" / "model" / "new_library.sqlite"
assert path == str(canonical)
# Directory should be created
assert canonical.parent.exists()
# But file should not exist yet
assert not canonical.exists()
class TestLegacyCacheCleanup:
"""Tests for legacy cache cleanup functions."""
def test_get_legacy_cache_files_for_cleanup(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create canonical and legacy files
canonical = settings_dir / "cache" / "model" / "default.sqlite"
canonical.parent.mkdir(parents=True)
canonical.write_text("canonical")
legacy = settings_dir / "model_cache.sqlite"
legacy.write_text("legacy")
files = get_legacy_cache_files_for_cleanup()
assert str(legacy) in files
def test_cleanup_legacy_cache_files_dry_run(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create canonical and legacy files
canonical = settings_dir / "cache" / "model" / "default.sqlite"
canonical.parent.mkdir(parents=True)
canonical.write_text("canonical")
legacy = settings_dir / "model_cache.sqlite"
legacy.write_text("legacy")
removed = cleanup_legacy_cache_files(dry_run=True)
assert str(legacy) in removed
# File should still exist (dry run)
assert legacy.exists()
def test_cleanup_legacy_cache_files_actual(self, tmp_path, monkeypatch):
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create canonical and legacy files
canonical = settings_dir / "cache" / "model" / "default.sqlite"
canonical.parent.mkdir(parents=True)
canonical.write_text("canonical")
legacy = settings_dir / "model_cache.sqlite"
legacy.write_text("legacy")
removed = cleanup_legacy_cache_files(dry_run=False)
assert str(legacy) in removed
# File should be deleted
assert not legacy.exists()
class TestAutomaticCleanup:
"""Tests for automatic cleanup during migration."""
def test_automatic_cleanup_on_migration(self, tmp_path, monkeypatch):
"""Test that legacy files are automatically cleaned up after migration."""
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create a legacy cache file
legacy_dir = settings_dir / "model_cache"
legacy_dir.mkdir()
legacy_file = legacy_dir / "default.sqlite"
legacy_file.write_text("test data")
# Verify legacy file exists
assert legacy_file.exists()
# Trigger migration (this should auto-cleanup)
resolved_path = resolve_cache_path_with_migration(CacheType.MODEL, "default")
# Verify canonical file exists
canonical_path = settings_dir / "cache" / "model" / "default.sqlite"
assert resolved_path == str(canonical_path)
assert canonical_path.exists()
assert canonical_path.read_text() == "test data"
# Verify legacy file was cleaned up
assert not legacy_file.exists()
# Verify empty directory was cleaned up
assert not legacy_dir.exists()
def test_automatic_cleanup_with_verification(self, tmp_path, monkeypatch):
"""Test that cleanup verifies file integrity before deletion."""
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Create legacy cache
legacy_dir = settings_dir / "recipe_cache"
legacy_dir.mkdir()
legacy_file = legacy_dir / "my_library.sqlite"
legacy_file.write_text("data")
# Trigger migration
resolved_path = resolve_cache_path_with_migration(CacheType.RECIPE, "my_library")
canonical_path = settings_dir / "cache" / "recipe" / "my_library.sqlite"
# Both should exist initially (migration successful)
assert canonical_path.exists()
assert legacy_file.exists() is False # Auto-cleanup removes it
# File content should match (integrity check)
assert canonical_path.read_text() == "data"
# Directory should be cleaned up
assert not legacy_dir.exists()
def test_automatic_cleanup_multiple_cache_types(self, tmp_path, monkeypatch):
"""Test automatic cleanup for different cache types."""
settings_dir = tmp_path / "settings"
settings_dir.mkdir()
def fake_get_settings_dir(create: bool = True) -> str:
return str(settings_dir)
monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir)
# Test RECIPE_FTS migration
legacy_fts = settings_dir / "recipe_fts.sqlite"
legacy_fts.write_text("fts data")
resolve_cache_path_with_migration(CacheType.RECIPE_FTS)
canonical_fts = settings_dir / "cache" / "fts" / "recipe_fts.sqlite"
assert canonical_fts.exists()
assert not legacy_fts.exists()
# Test TAG_FTS migration
legacy_tag = settings_dir / "tag_fts.sqlite"
legacy_tag.write_text("tag data")
resolve_cache_path_with_migration(CacheType.TAG_FTS)
canonical_tag = settings_dir / "cache" / "fts" / "tag_fts.sqlite"
assert canonical_tag.exists()
assert not legacy_tag.exists()