diff --git a/py/config.py b/py/config.py index 315278e6..9c9b0570 100644 --- a/py/config.py +++ b/py/config.py @@ -9,6 +9,7 @@ import json import urllib.parse import time +from .utils.cache_paths import CacheType, get_cache_file_path, get_legacy_cache_paths from .utils.settings_paths import ensure_settings_file, get_settings_dir, load_settings_template # Use an environment variable to control standalone mode @@ -227,9 +228,8 @@ class Config: return os.path.normpath(path).replace(os.sep, '/') def _get_symlink_cache_path(self) -> Path: - cache_dir = Path(get_settings_dir(create=True)) / "cache" - cache_dir.mkdir(parents=True, exist_ok=True) - return cache_dir / "symlink_map.json" + canonical_path = get_cache_file_path(CacheType.SYMLINK, create_dir=True) + return Path(canonical_path) def _symlink_roots(self) -> List[str]: roots: List[str] = [] @@ -280,14 +280,28 @@ class Config: def _load_persisted_cache_into_mappings(self) -> bool: """Load the symlink cache and store its fingerprint for comparison.""" cache_path = self._get_symlink_cache_path() - if not cache_path.exists(): - return False - try: - with cache_path.open("r", encoding="utf-8") as handle: - payload = json.load(handle) - except Exception as exc: - logger.info("Failed to load symlink cache %s: %s", cache_path, exc) + # Check canonical path first, then legacy paths for migration + paths_to_check = [cache_path] + legacy_paths = get_legacy_cache_paths(CacheType.SYMLINK) + paths_to_check.extend(Path(p) for p in legacy_paths if p != str(cache_path)) + + loaded_path = None + payload = None + + for check_path in paths_to_check: + if not check_path.exists(): + continue + try: + with check_path.open("r", encoding="utf-8") as handle: + payload = json.load(handle) + loaded_path = check_path + break + except Exception as exc: + logger.info("Failed to load symlink cache %s: %s", check_path, exc) + continue + + if payload is None: return False if not isinstance(payload, dict): @@ -307,7 +321,37 @@ class Config: normalized_mappings[self._normalize_path(target)] = self._normalize_path(link) self._path_mappings = normalized_mappings - logger.info("Symlink cache loaded with %d mappings", len(self._path_mappings)) + + # Log migration if loaded from legacy path + if loaded_path is not None and loaded_path != cache_path: + logger.info( + "Symlink cache migrated from %s (will save to %s)", + loaded_path, + cache_path, + ) + + try: + if loaded_path.exists(): + loaded_path.unlink() + logger.info("Cleaned up legacy symlink cache: %s", loaded_path) + + try: + parent_dir = loaded_path.parent + if parent_dir.name == "cache" and not any(parent_dir.iterdir()): + parent_dir.rmdir() + logger.info("Removed empty legacy cache directory: %s", parent_dir) + except Exception: + pass + + except Exception as exc: + logger.warning( + "Failed to cleanup legacy symlink cache %s: %s", + loaded_path, + exc, + ) + else: + logger.info("Symlink cache loaded with %d mappings", len(self._path_mappings)) + return True def _save_symlink_cache(self) -> None: diff --git a/py/services/persistent_model_cache.py b/py/services/persistent_model_cache.py index c3ebcc27..9174a64d 100644 --- a/py/services/persistent_model_cache.py +++ b/py/services/persistent_model_cache.py @@ -1,13 +1,12 @@ import json import logging import os -import re import sqlite3 import threading from dataclasses import dataclass from typing import Dict, List, Mapping, Optional, Sequence, Tuple -from ..utils.settings_paths import get_project_root, get_settings_dir +from ..utils.cache_paths import CacheType, resolve_cache_path_with_migration logger = logging.getLogger(__name__) @@ -404,20 +403,12 @@ class PersistentModelCache: # Internal helpers ------------------------------------------------- def _resolve_default_path(self, library_name: str) -> str: - override = os.environ.get("LORA_MANAGER_CACHE_DB") - if override: - return override - try: - settings_dir = get_settings_dir(create=True) - except Exception as exc: # pragma: no cover - defensive guard - logger.warning("Falling back to project directory for cache: %s", exc) - settings_dir = get_project_root() - safe_name = re.sub(r"[^A-Za-z0-9_.-]", "_", library_name or "default") - if safe_name.lower() in ("default", ""): - legacy_path = os.path.join(settings_dir, self._DEFAULT_FILENAME) - if os.path.exists(legacy_path): - return legacy_path - return os.path.join(settings_dir, "model_cache", f"{safe_name}.sqlite") + env_override = os.environ.get("LORA_MANAGER_CACHE_DB") + return resolve_cache_path_with_migration( + CacheType.MODEL, + library_name=library_name, + env_override=env_override, + ) def _initialize_schema(self) -> None: with self._db_lock: diff --git a/py/services/persistent_recipe_cache.py b/py/services/persistent_recipe_cache.py index ceac32c9..438ff7ae 100644 --- a/py/services/persistent_recipe_cache.py +++ b/py/services/persistent_recipe_cache.py @@ -10,13 +10,12 @@ from __future__ import annotations import json import logging import os -import re import sqlite3 import threading from dataclasses import dataclass from typing import Dict, List, Optional, Set, Tuple -from ..utils.settings_paths import get_project_root, get_settings_dir +from ..utils.cache_paths import CacheType, resolve_cache_path_with_migration logger = logging.getLogger(__name__) @@ -312,20 +311,12 @@ class PersistentRecipeCache: # Internal helpers def _resolve_default_path(self, library_name: str) -> str: - override = os.environ.get("LORA_MANAGER_RECIPE_CACHE_DB") - if override: - return override - try: - settings_dir = get_settings_dir(create=True) - except Exception as exc: - logger.warning("Falling back to project directory for recipe cache: %s", exc) - settings_dir = get_project_root() - safe_name = re.sub(r"[^A-Za-z0-9_.-]", "_", library_name or "default") - if safe_name.lower() in ("default", ""): - legacy_path = os.path.join(settings_dir, self._DEFAULT_FILENAME) - if os.path.exists(legacy_path): - return legacy_path - return os.path.join(settings_dir, "recipe_cache", f"{safe_name}.sqlite") + env_override = os.environ.get("LORA_MANAGER_RECIPE_CACHE_DB") + return resolve_cache_path_with_migration( + CacheType.RECIPE, + library_name=library_name, + env_override=env_override, + ) def _initialize_schema(self) -> None: with self._db_lock: diff --git a/py/services/recipe_fts_index.py b/py/services/recipe_fts_index.py index c49bb5dc..29357ee1 100644 --- a/py/services/recipe_fts_index.py +++ b/py/services/recipe_fts_index.py @@ -15,7 +15,7 @@ import threading import time from typing import Any, Dict, List, Optional, Set -from ..utils.settings_paths import get_settings_dir +from ..utils.cache_paths import CacheType, resolve_cache_path_with_migration logger = logging.getLogger(__name__) @@ -67,17 +67,11 @@ class RecipeFTSIndex: def _resolve_default_path(self) -> str: """Resolve the default database path.""" - override = os.environ.get("LORA_MANAGER_RECIPE_FTS_DB") - if override: - return override - - try: - settings_dir = get_settings_dir(create=True) - except Exception as exc: - logger.warning("Falling back to current directory for FTS index: %s", exc) - settings_dir = "." - - return os.path.join(settings_dir, self._DEFAULT_FILENAME) + env_override = os.environ.get("LORA_MANAGER_RECIPE_FTS_DB") + return resolve_cache_path_with_migration( + CacheType.RECIPE_FTS, + env_override=env_override, + ) def get_database_path(self) -> str: """Return the resolved database path.""" diff --git a/py/services/tag_fts_index.py b/py/services/tag_fts_index.py index 02f97d02..c3f6cc9b 100644 --- a/py/services/tag_fts_index.py +++ b/py/services/tag_fts_index.py @@ -16,7 +16,7 @@ import time from pathlib import Path from typing import Dict, List, Optional, Set -from ..utils.settings_paths import get_settings_dir +from ..utils.cache_paths import CacheType, resolve_cache_path_with_migration logger = logging.getLogger(__name__) @@ -89,17 +89,11 @@ class TagFTSIndex: def _resolve_default_db_path(self) -> str: """Resolve the default database path.""" - override = os.environ.get("LORA_MANAGER_TAG_FTS_DB") - if override: - return override - - try: - settings_dir = get_settings_dir(create=True) - except Exception as exc: - logger.warning("Falling back to current directory for FTS index: %s", exc) - settings_dir = "." - - return os.path.join(settings_dir, self._DEFAULT_FILENAME) + env_override = os.environ.get("LORA_MANAGER_TAG_FTS_DB") + return resolve_cache_path_with_migration( + CacheType.TAG_FTS, + env_override=env_override, + ) def _resolve_default_csv_path(self) -> str: """Resolve the default CSV file path.""" diff --git a/py/utils/cache_paths.py b/py/utils/cache_paths.py new file mode 100644 index 00000000..c7658a44 --- /dev/null +++ b/py/utils/cache_paths.py @@ -0,0 +1,421 @@ +"""Centralized cache path resolution with automatic migration support. + +This module provides a unified interface for resolving cache file paths, +with automatic migration from legacy locations to the new organized +cache directory structure. + +Target structure: + {settings_dir}/ + └── cache/ + ├── symlink/ + │ └── symlink_map.json + ├── model/ + │ └── {library_name}.sqlite + ├── recipe/ + │ └── {library_name}.sqlite + └── fts/ + ├── recipe_fts.sqlite + └── tag_fts.sqlite +""" + +from __future__ import annotations + +import logging +import os +import re +import shutil +from enum import Enum +from typing import List, Optional + +from .settings_paths import get_project_root, get_settings_dir + +logger = logging.getLogger(__name__) + + +class CacheType(Enum): + """Types of cache files managed by the cache path resolver.""" + + MODEL = "model" + RECIPE = "recipe" + RECIPE_FTS = "recipe_fts" + TAG_FTS = "tag_fts" + SYMLINK = "symlink" + + +# Subdirectory structure for each cache type +_CACHE_SUBDIRS = { + CacheType.MODEL: "model", + CacheType.RECIPE: "recipe", + CacheType.RECIPE_FTS: "fts", + CacheType.TAG_FTS: "fts", + CacheType.SYMLINK: "symlink", +} + +# Filename patterns for each cache type +_CACHE_FILENAMES = { + CacheType.MODEL: "{library_name}.sqlite", + CacheType.RECIPE: "{library_name}.sqlite", + CacheType.RECIPE_FTS: "recipe_fts.sqlite", + CacheType.TAG_FTS: "tag_fts.sqlite", + CacheType.SYMLINK: "symlink_map.json", +} + + +def get_cache_base_dir(create: bool = True) -> str: + """Return the base cache directory path. + + Args: + create: Whether to create the directory if it does not exist. + + Returns: + The absolute path to the cache base directory ({settings_dir}/cache/). + """ + settings_dir = get_settings_dir(create=create) + cache_dir = os.path.join(settings_dir, "cache") + if create: + os.makedirs(cache_dir, exist_ok=True) + return cache_dir + + +def _sanitize_library_name(library_name: Optional[str]) -> str: + """Sanitize a library name for use in filenames. + + Args: + library_name: The library name to sanitize. + + Returns: + A sanitized version safe for use in filenames. + """ + name = library_name or "default" + return re.sub(r"[^A-Za-z0-9_.-]", "_", name) + + +def get_cache_file_path( + cache_type: CacheType, + library_name: Optional[str] = None, + create_dir: bool = True, +) -> str: + """Get the canonical path for a cache file. + + Args: + cache_type: The type of cache file. + library_name: The library name (only used for MODEL and RECIPE types). + create_dir: Whether to create the parent directory if it does not exist. + + Returns: + The absolute path to the cache file in its canonical location. + """ + cache_base = get_cache_base_dir(create=create_dir) + subdir = _CACHE_SUBDIRS[cache_type] + cache_dir = os.path.join(cache_base, subdir) + + if create_dir: + os.makedirs(cache_dir, exist_ok=True) + + filename_template = _CACHE_FILENAMES[cache_type] + safe_name = _sanitize_library_name(library_name) + filename = filename_template.format(library_name=safe_name) + + return os.path.join(cache_dir, filename) + + +def get_legacy_cache_paths( + cache_type: CacheType, + library_name: Optional[str] = None, +) -> List[str]: + """Get a list of legacy cache file paths to check for migration. + + The paths are returned in order of priority (most recent first). + + Args: + cache_type: The type of cache file. + library_name: The library name (only used for MODEL and RECIPE types). + + Returns: + A list of potential legacy paths to check, in order of preference. + """ + try: + settings_dir = get_settings_dir(create=False) + except Exception: + settings_dir = get_project_root() + + safe_name = _sanitize_library_name(library_name) + legacy_paths: List[str] = [] + + if cache_type == CacheType.MODEL: + # Legacy per-library path: {settings_dir}/model_cache/{library}.sqlite + legacy_paths.append( + os.path.join(settings_dir, "model_cache", f"{safe_name}.sqlite") + ) + # Legacy root-level single cache (for "default" library only) + if safe_name.lower() in ("default", ""): + legacy_paths.append(os.path.join(settings_dir, "model_cache.sqlite")) + + elif cache_type == CacheType.RECIPE: + # Legacy per-library path: {settings_dir}/recipe_cache/{library}.sqlite + legacy_paths.append( + os.path.join(settings_dir, "recipe_cache", f"{safe_name}.sqlite") + ) + # Legacy root-level single cache (for "default" library only) + if safe_name.lower() in ("default", ""): + legacy_paths.append(os.path.join(settings_dir, "recipe_cache.sqlite")) + + elif cache_type == CacheType.RECIPE_FTS: + # Legacy root-level path + legacy_paths.append(os.path.join(settings_dir, "recipe_fts.sqlite")) + + elif cache_type == CacheType.TAG_FTS: + # Legacy root-level path + legacy_paths.append(os.path.join(settings_dir, "tag_fts.sqlite")) + + elif cache_type == CacheType.SYMLINK: + # Current location in cache/ but without subdirectory + legacy_paths.append( + os.path.join(settings_dir, "cache", "symlink_map.json") + ) + + return legacy_paths + + +def _cleanup_legacy_file_after_migration( + legacy_path: str, + canonical_path: str, +) -> bool: + """Safely remove a legacy file after successful migration. + + Args: + legacy_path: The legacy file path to remove. + canonical_path: The canonical path where the file was copied to. + + Returns: + True if cleanup succeeded, False otherwise. + """ + try: + if not os.path.exists(canonical_path): + logger.warning( + "Skipping cleanup of %s: canonical file not found at %s", + legacy_path, + canonical_path, + ) + return False + + legacy_size = os.path.getsize(legacy_path) + canonical_size = os.path.getsize(canonical_path) + if legacy_size != canonical_size: + logger.warning( + "Skipping cleanup of %s: file size mismatch (legacy=%d, canonical=%d)", + legacy_path, + legacy_size, + canonical_size, + ) + return False + + os.remove(legacy_path) + logger.info("Cleaned up legacy cache file: %s", legacy_path) + + _cleanup_empty_legacy_directories(legacy_path) + + return True + + except Exception as exc: + logger.warning( + "Failed to cleanup legacy cache file %s: %s", + legacy_path, + exc, + ) + return False + + +def _cleanup_empty_legacy_directories(legacy_path: str) -> None: + """Remove empty parent directories of a legacy file. + + This function only removes directories if they are empty, + using os.rmdir() which fails on non-empty directories. + + Args: + legacy_path: The legacy file path whose parent directories should be cleaned. + """ + try: + parent_dir = os.path.dirname(legacy_path) + + legacy_dir_names = ("model_cache", "recipe_cache") + + current = parent_dir + while current: + base_name = os.path.basename(current) + + if base_name in legacy_dir_names: + if os.path.isdir(current) and not os.listdir(current): + try: + os.rmdir(current) + logger.info("Removed empty legacy directory: %s", current) + except Exception: + pass + + parent = os.path.dirname(current) + if parent == current: + break + current = parent + + except Exception as exc: + logger.debug("Failed to cleanup empty legacy directories: %s", exc) + + +def resolve_cache_path_with_migration( + cache_type: CacheType, + library_name: Optional[str] = None, + env_override: Optional[str] = None, +) -> str: + """Resolve the cache file path, migrating from legacy locations if needed. + + This function performs lazy migration: on first access, it checks if the + file exists at the canonical location. If not, it looks for legacy files + and copies them to the new location. After successful migration, the + legacy file is automatically removed. + + Args: + cache_type: The type of cache file. + library_name: The library name (only used for MODEL and RECIPE types). + env_override: Optional environment variable value that overrides all + path resolution. When set, returns this path directly without + any migration. + + Returns: + The resolved path to use for the cache file. + """ + # Environment override bypasses all migration logic + if env_override: + return env_override + + canonical_path = get_cache_file_path(cache_type, library_name, create_dir=True) + + # If file already exists at canonical location, use it + if os.path.exists(canonical_path): + return canonical_path + + # Check legacy paths for migration + legacy_paths = get_legacy_cache_paths(cache_type, library_name) + + for legacy_path in legacy_paths: + if os.path.exists(legacy_path): + try: + shutil.copy2(legacy_path, canonical_path) + logger.info( + "Migrated %s cache from %s to %s", + cache_type.value, + legacy_path, + canonical_path, + ) + + _cleanup_legacy_file_after_migration(legacy_path, canonical_path) + + return canonical_path + except Exception as exc: + logger.warning( + "Failed to migrate %s cache from %s: %s", + cache_type.value, + legacy_path, + exc, + ) + + # No legacy file found; return canonical path (will be created fresh) + return canonical_path + + +def get_legacy_cache_files_for_cleanup() -> List[str]: + """Get a list of legacy cache files that can be removed after migration. + + This function returns files that exist in legacy locations and have + corresponding files in the new canonical locations. + + Returns: + A list of legacy file paths that are safe to remove. + """ + files_to_remove: List[str] = [] + + try: + settings_dir = get_settings_dir(create=False) + except Exception: + return files_to_remove + + # Check each cache type for migrated legacy files + for cache_type in CacheType: + # For MODEL and RECIPE, we need to check each library + if cache_type in (CacheType.MODEL, CacheType.RECIPE): + # Check default library + _check_legacy_for_cleanup(cache_type, "default", files_to_remove) + # Check for any per-library caches in legacy directories + legacy_dir_name = "model_cache" if cache_type == CacheType.MODEL else "recipe_cache" + legacy_dir = os.path.join(settings_dir, legacy_dir_name) + if os.path.isdir(legacy_dir): + try: + for filename in os.listdir(legacy_dir): + if filename.endswith(".sqlite"): + library_name = filename[:-7] # Remove .sqlite + _check_legacy_for_cleanup(cache_type, library_name, files_to_remove) + except Exception: + pass + else: + _check_legacy_for_cleanup(cache_type, None, files_to_remove) + + return files_to_remove + + +def _check_legacy_for_cleanup( + cache_type: CacheType, + library_name: Optional[str], + files_to_remove: List[str], +) -> None: + """Check if a legacy cache file can be removed after migration. + + Args: + cache_type: The type of cache file. + library_name: The library name (only used for MODEL and RECIPE types). + files_to_remove: List to append removable files to. + """ + canonical_path = get_cache_file_path(cache_type, library_name, create_dir=False) + if not os.path.exists(canonical_path): + return + + legacy_paths = get_legacy_cache_paths(cache_type, library_name) + for legacy_path in legacy_paths: + if os.path.exists(legacy_path) and legacy_path not in files_to_remove: + files_to_remove.append(legacy_path) + + +def cleanup_legacy_cache_files(dry_run: bool = True) -> List[str]: + """Remove legacy cache files that have been migrated. + + Args: + dry_run: If True, only return the list of files that would be removed + without actually removing them. + + Returns: + A list of files that were (or would be) removed. + """ + files = get_legacy_cache_files_for_cleanup() + + if dry_run or not files: + return files + + removed: List[str] = [] + for file_path in files: + try: + os.remove(file_path) + removed.append(file_path) + logger.info("Removed legacy cache file: %s", file_path) + except Exception as exc: + logger.warning("Failed to remove legacy cache file %s: %s", file_path, exc) + + # Try to remove empty legacy directories + try: + settings_dir = get_settings_dir(create=False) + for legacy_dir_name in ("model_cache", "recipe_cache"): + legacy_dir = os.path.join(settings_dir, legacy_dir_name) + if os.path.isdir(legacy_dir) and not os.listdir(legacy_dir): + os.rmdir(legacy_dir) + logger.info("Removed empty legacy directory: %s", legacy_dir) + except Exception: + pass + + return removed diff --git a/tests/config/test_symlink_cache.py b/tests/config/test_symlink_cache.py index c32dacb1..91fa738b 100644 --- a/tests/config/test_symlink_cache.py +++ b/tests/config/test_symlink_cache.py @@ -4,6 +4,7 @@ import os import pytest from py import config as config_module +from py.utils import cache_paths as cache_paths_module def _normalize(path: str) -> str: @@ -28,9 +29,14 @@ def _setup_paths(monkeypatch: pytest.MonkeyPatch, tmp_path): } return mapping.get(kind, []) + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths) monkeypatch.setattr(config_module, "standalone_mode", True) - monkeypatch.setattr(config_module, "get_settings_dir", lambda create=True: str(settings_dir)) + monkeypatch.setattr(config_module, "get_settings_dir", fake_get_settings_dir) + # Also patch cache_paths module which has its own import of get_settings_dir + monkeypatch.setattr(cache_paths_module, "get_settings_dir", fake_get_settings_dir) return loras_dir, settings_dir @@ -57,7 +63,7 @@ def test_symlink_scan_skips_file_links(monkeypatch: pytest.MonkeyPatch, tmp_path normalized_file_real = _normalize(os.path.realpath(file_target)) assert normalized_file_real not in cfg._path_mappings - cache_path = settings_dir / "cache" / "symlink_map.json" + cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json" assert cache_path.exists() @@ -71,7 +77,7 @@ def test_symlink_cache_reuses_previous_scan(monkeypatch: pytest.MonkeyPatch, tmp first_cfg = config_module.Config() cached_mappings = dict(first_cfg._path_mappings) - cache_path = settings_dir / "cache" / "symlink_map.json" + cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json" assert cache_path.exists() def fail_scan(self): @@ -97,7 +103,7 @@ def test_symlink_cache_survives_noise_mtime(monkeypatch: pytest.MonkeyPatch, tmp noise_file = recipes_dir / "touchme.txt" first_cfg = config_module.Config() - cache_path = settings_dir / "cache" / "symlink_map.json" + cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json" assert cache_path.exists() # Update a noisy path to bump parent directory mtime @@ -164,9 +170,14 @@ def test_symlink_roots_are_preserved(monkeypatch: pytest.MonkeyPatch, tmp_path): } return mapping.get(kind, []) + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths) monkeypatch.setattr(config_module, "standalone_mode", True) - monkeypatch.setattr(config_module, "get_settings_dir", lambda create=True: str(settings_dir)) + monkeypatch.setattr(config_module, "get_settings_dir", fake_get_settings_dir) + # Also patch cache_paths module which has its own import of get_settings_dir + monkeypatch.setattr(cache_paths_module, "get_settings_dir", fake_get_settings_dir) cfg = config_module.Config() @@ -174,6 +185,65 @@ def test_symlink_roots_are_preserved(monkeypatch: pytest.MonkeyPatch, tmp_path): normalized_link = _normalize(str(loras_link)) assert cfg._path_mappings[normalized_real] == normalized_link - cache_path = settings_dir / "cache" / "symlink_map.json" + cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json" payload = json.loads(cache_path.read_text(encoding="utf-8")) assert payload["path_mappings"][normalized_real] == normalized_link + + +def test_legacy_symlink_cache_automatic_cleanup(monkeypatch: pytest.MonkeyPatch, tmp_path): + """Test that legacy symlink cache is automatically cleaned up after migration.""" + settings_dir = tmp_path / "settings" + loras_dir = tmp_path / "loras" + loras_dir.mkdir() + checkpoint_dir = tmp_path / "checkpoints" + checkpoint_dir.mkdir() + embedding_dir = tmp_path / "embeddings" + embedding_dir.mkdir() + + def fake_get_folder_paths(kind: str): + mapping = { + "loras": [str(loras_dir)], + "checkpoints": [str(checkpoint_dir)], + "unet": [], + "embeddings": [str(embedding_dir)], + } + return mapping.get(kind, []) + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr(config_module.folder_paths, "get_folder_paths", fake_get_folder_paths) + monkeypatch.setattr(config_module, "standalone_mode", True) + monkeypatch.setattr(config_module, "get_settings_dir", fake_get_settings_dir) + monkeypatch.setattr(cache_paths_module, "get_settings_dir", fake_get_settings_dir) + + # Create legacy symlink cache at old location + settings_dir.mkdir(parents=True, exist_ok=True) + legacy_cache_dir = settings_dir / "cache" + legacy_cache_dir.mkdir(exist_ok=True) + legacy_cache_path = legacy_cache_dir / "symlink_map.json" + + # Write some legacy cache data + legacy_data = { + "fingerprint": {"roots": []}, + "path_mappings": { + "/legacy/target": "/legacy/link" + } + } + legacy_cache_path.write_text(json.dumps(legacy_data), encoding="utf-8") + + # Verify legacy file exists + assert legacy_cache_path.exists() + + # Initialize Config - this should trigger migration and automatic cleanup + cfg = config_module.Config() + + # New canonical cache should exist + new_cache_path = settings_dir / "cache" / "symlink" / "symlink_map.json" + assert new_cache_path.exists() + + # Legacy file should be automatically cleaned up + assert not legacy_cache_path.exists() + + # Config should still work correctly + assert isinstance(cfg._path_mappings, dict) diff --git a/tests/services/test_settings_manager.py b/tests/services/test_settings_manager.py index e8298aa6..423ef5d8 100644 --- a/tests/services/test_settings_manager.py +++ b/tests/services/test_settings_manager.py @@ -37,7 +37,7 @@ def test_portable_settings_use_project_root(tmp_path, monkeypatch): monkeypatch.delenv("LORA_MANAGER_CACHE_DB", raising=False) cache = cache_module.PersistentModelCache(library_name="portable_lib") - expected_cache_path = tmp_path / "model_cache" / "portable_lib.sqlite" + expected_cache_path = tmp_path / "cache" / "model" / "portable_lib.sqlite" assert cache.get_database_path() == str(expected_cache_path) assert expected_cache_path.parent.is_dir() diff --git a/tests/utils/test_cache_paths.py b/tests/utils/test_cache_paths.py new file mode 100644 index 00000000..6a75033d --- /dev/null +++ b/tests/utils/test_cache_paths.py @@ -0,0 +1,529 @@ +"""Unit tests for the cache_paths module.""" + +import os +import shutil +import tempfile +from pathlib import Path + +import pytest + +from py.utils.cache_paths import ( + CacheType, + cleanup_legacy_cache_files, + get_cache_base_dir, + get_cache_file_path, + get_legacy_cache_files_for_cleanup, + get_legacy_cache_paths, + resolve_cache_path_with_migration, +) + + +class TestCacheType: + """Tests for the CacheType enum.""" + + def test_enum_values(self): + assert CacheType.MODEL.value == "model" + assert CacheType.RECIPE.value == "recipe" + assert CacheType.RECIPE_FTS.value == "recipe_fts" + assert CacheType.TAG_FTS.value == "tag_fts" + assert CacheType.SYMLINK.value == "symlink" + + +class TestGetCacheBaseDir: + """Tests for get_cache_base_dir function.""" + + def test_returns_cache_subdirectory(self): + cache_dir = get_cache_base_dir(create=True) + assert cache_dir.endswith("cache") + assert os.path.isdir(cache_dir) + + def test_creates_directory_when_requested(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + cache_dir = get_cache_base_dir(create=True) + assert os.path.isdir(cache_dir) + assert cache_dir == str(settings_dir / "cache") + + +class TestGetCacheFilePath: + """Tests for get_cache_file_path function.""" + + def test_model_cache_path(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + path = get_cache_file_path(CacheType.MODEL, "my_library", create_dir=True) + expected = settings_dir / "cache" / "model" / "my_library.sqlite" + assert path == str(expected) + assert os.path.isdir(expected.parent) + + def test_recipe_cache_path(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + path = get_cache_file_path(CacheType.RECIPE, "default", create_dir=True) + expected = settings_dir / "cache" / "recipe" / "default.sqlite" + assert path == str(expected) + + def test_recipe_fts_path(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + path = get_cache_file_path(CacheType.RECIPE_FTS, create_dir=True) + expected = settings_dir / "cache" / "fts" / "recipe_fts.sqlite" + assert path == str(expected) + + def test_tag_fts_path(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + path = get_cache_file_path(CacheType.TAG_FTS, create_dir=True) + expected = settings_dir / "cache" / "fts" / "tag_fts.sqlite" + assert path == str(expected) + + def test_symlink_path(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + path = get_cache_file_path(CacheType.SYMLINK, create_dir=True) + expected = settings_dir / "cache" / "symlink" / "symlink_map.json" + assert path == str(expected) + + def test_sanitizes_library_name(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + path = get_cache_file_path(CacheType.MODEL, "my/bad:name", create_dir=True) + assert "my_bad_name" in path + + def test_none_library_name_defaults_to_default(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + path = get_cache_file_path(CacheType.MODEL, None, create_dir=True) + assert "default.sqlite" in path + + +class TestGetLegacyCachePaths: + """Tests for get_legacy_cache_paths function.""" + + def test_model_legacy_paths_for_default(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + paths = get_legacy_cache_paths(CacheType.MODEL, "default") + assert len(paths) == 2 + assert str(settings_dir / "model_cache" / "default.sqlite") in paths + assert str(settings_dir / "model_cache.sqlite") in paths + + def test_model_legacy_paths_for_named_library(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + paths = get_legacy_cache_paths(CacheType.MODEL, "my_library") + assert len(paths) == 1 + assert str(settings_dir / "model_cache" / "my_library.sqlite") in paths + + def test_recipe_legacy_paths(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + paths = get_legacy_cache_paths(CacheType.RECIPE, "default") + assert len(paths) == 2 + assert str(settings_dir / "recipe_cache" / "default.sqlite") in paths + assert str(settings_dir / "recipe_cache.sqlite") in paths + + def test_recipe_fts_legacy_path(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + paths = get_legacy_cache_paths(CacheType.RECIPE_FTS) + assert len(paths) == 1 + assert str(settings_dir / "recipe_fts.sqlite") in paths + + def test_tag_fts_legacy_path(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + paths = get_legacy_cache_paths(CacheType.TAG_FTS) + assert len(paths) == 1 + assert str(settings_dir / "tag_fts.sqlite") in paths + + def test_symlink_legacy_path(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + paths = get_legacy_cache_paths(CacheType.SYMLINK) + assert len(paths) == 1 + assert str(settings_dir / "cache" / "symlink_map.json") in paths + + +class TestResolveCachePathWithMigration: + """Tests for resolve_cache_path_with_migration function.""" + + def test_returns_env_override_when_set(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + override_path = "/custom/path/cache.sqlite" + path = resolve_cache_path_with_migration( + CacheType.MODEL, + library_name="default", + env_override=override_path, + ) + assert path == override_path + + def test_returns_canonical_path_when_exists(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + # Create the canonical path + canonical = settings_dir / "cache" / "model" / "default.sqlite" + canonical.parent.mkdir(parents=True) + canonical.write_text("existing") + + path = resolve_cache_path_with_migration(CacheType.MODEL, "default") + assert path == str(canonical) + + def test_migrates_from_legacy_root_level_cache(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + # Create legacy cache at root level + legacy_path = settings_dir / "model_cache.sqlite" + legacy_path.write_text("legacy data") + + path = resolve_cache_path_with_migration(CacheType.MODEL, "default") + + # Should return canonical path + canonical = settings_dir / "cache" / "model" / "default.sqlite" + assert path == str(canonical) + + # File should be copied to canonical location + assert canonical.exists() + assert canonical.read_text() == "legacy data" + + # Legacy file should be automatically cleaned up + assert not legacy_path.exists() + + def test_migrates_from_legacy_per_library_cache(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + # Create legacy per-library cache + legacy_dir = settings_dir / "model_cache" + legacy_dir.mkdir() + legacy_path = legacy_dir / "my_library.sqlite" + legacy_path.write_text("legacy library data") + + path = resolve_cache_path_with_migration(CacheType.MODEL, "my_library") + + # Should return canonical path + canonical = settings_dir / "cache" / "model" / "my_library.sqlite" + assert path == str(canonical) + assert canonical.exists() + assert canonical.read_text() == "legacy library data" + + # Legacy file should be automatically cleaned up + assert not legacy_path.exists() + + # Empty legacy directory should be cleaned up + assert not legacy_dir.exists() + + def test_prefers_per_library_over_root_for_migration(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + # Create both legacy caches + legacy_root = settings_dir / "model_cache.sqlite" + legacy_root.write_text("root legacy") + + legacy_dir = settings_dir / "model_cache" + legacy_dir.mkdir() + legacy_lib = legacy_dir / "default.sqlite" + legacy_lib.write_text("library legacy") + + path = resolve_cache_path_with_migration(CacheType.MODEL, "default") + + canonical = settings_dir / "cache" / "model" / "default.sqlite" + assert path == str(canonical) + # Should migrate from per-library path (first in legacy list) + assert canonical.read_text() == "library legacy" + + def test_returns_canonical_path_when_no_legacy_exists(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + path = resolve_cache_path_with_migration(CacheType.MODEL, "new_library") + + canonical = settings_dir / "cache" / "model" / "new_library.sqlite" + assert path == str(canonical) + # Directory should be created + assert canonical.parent.exists() + # But file should not exist yet + assert not canonical.exists() + + +class TestLegacyCacheCleanup: + """Tests for legacy cache cleanup functions.""" + + def test_get_legacy_cache_files_for_cleanup(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + # Create canonical and legacy files + canonical = settings_dir / "cache" / "model" / "default.sqlite" + canonical.parent.mkdir(parents=True) + canonical.write_text("canonical") + + legacy = settings_dir / "model_cache.sqlite" + legacy.write_text("legacy") + + files = get_legacy_cache_files_for_cleanup() + assert str(legacy) in files + + def test_cleanup_legacy_cache_files_dry_run(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + # Create canonical and legacy files + canonical = settings_dir / "cache" / "model" / "default.sqlite" + canonical.parent.mkdir(parents=True) + canonical.write_text("canonical") + + legacy = settings_dir / "model_cache.sqlite" + legacy.write_text("legacy") + + removed = cleanup_legacy_cache_files(dry_run=True) + assert str(legacy) in removed + # File should still exist (dry run) + assert legacy.exists() + + def test_cleanup_legacy_cache_files_actual(self, tmp_path, monkeypatch): + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + # Create canonical and legacy files + canonical = settings_dir / "cache" / "model" / "default.sqlite" + canonical.parent.mkdir(parents=True) + canonical.write_text("canonical") + + legacy = settings_dir / "model_cache.sqlite" + legacy.write_text("legacy") + + removed = cleanup_legacy_cache_files(dry_run=False) + assert str(legacy) in removed + # File should be deleted + assert not legacy.exists() + + +class TestAutomaticCleanup: + """Tests for automatic cleanup during migration.""" + + def test_automatic_cleanup_on_migration(self, tmp_path, monkeypatch): + """Test that legacy files are automatically cleaned up after migration.""" + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + # Create a legacy cache file + legacy_dir = settings_dir / "model_cache" + legacy_dir.mkdir() + legacy_file = legacy_dir / "default.sqlite" + legacy_file.write_text("test data") + + # Verify legacy file exists + assert legacy_file.exists() + + # Trigger migration (this should auto-cleanup) + resolved_path = resolve_cache_path_with_migration(CacheType.MODEL, "default") + + # Verify canonical file exists + canonical_path = settings_dir / "cache" / "model" / "default.sqlite" + assert resolved_path == str(canonical_path) + assert canonical_path.exists() + assert canonical_path.read_text() == "test data" + + # Verify legacy file was cleaned up + assert not legacy_file.exists() + + # Verify empty directory was cleaned up + assert not legacy_dir.exists() + + def test_automatic_cleanup_with_verification(self, tmp_path, monkeypatch): + """Test that cleanup verifies file integrity before deletion.""" + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + # Create legacy cache + legacy_dir = settings_dir / "recipe_cache" + legacy_dir.mkdir() + legacy_file = legacy_dir / "my_library.sqlite" + legacy_file.write_text("data") + + # Trigger migration + resolved_path = resolve_cache_path_with_migration(CacheType.RECIPE, "my_library") + canonical_path = settings_dir / "cache" / "recipe" / "my_library.sqlite" + + # Both should exist initially (migration successful) + assert canonical_path.exists() + assert legacy_file.exists() is False # Auto-cleanup removes it + + # File content should match (integrity check) + assert canonical_path.read_text() == "data" + + # Directory should be cleaned up + assert not legacy_dir.exists() + + def test_automatic_cleanup_multiple_cache_types(self, tmp_path, monkeypatch): + """Test automatic cleanup for different cache types.""" + settings_dir = tmp_path / "settings" + settings_dir.mkdir() + + def fake_get_settings_dir(create: bool = True) -> str: + return str(settings_dir) + + monkeypatch.setattr("py.utils.cache_paths.get_settings_dir", fake_get_settings_dir) + + # Test RECIPE_FTS migration + legacy_fts = settings_dir / "recipe_fts.sqlite" + legacy_fts.write_text("fts data") + resolve_cache_path_with_migration(CacheType.RECIPE_FTS) + canonical_fts = settings_dir / "cache" / "fts" / "recipe_fts.sqlite" + + assert canonical_fts.exists() + assert not legacy_fts.exists() + + # Test TAG_FTS migration + legacy_tag = settings_dir / "tag_fts.sqlite" + legacy_tag.write_text("tag data") + resolve_cache_path_with_migration(CacheType.TAG_FTS) + canonical_tag = settings_dir / "cache" / "fts" / "tag_fts.sqlite" + + assert canonical_tag.exists() + assert not legacy_tag.exists()