fix(settings): sync portable mode toggle

This commit is contained in:
Will Miao
2025-11-16 17:36:52 +08:00
parent 645b7c247d
commit 354cf03bbc
16 changed files with 269 additions and 9 deletions

View File

@@ -1,12 +1,13 @@
import asyncio
import copy
import threading
import json
import os
from concurrent.futures import Future
import pytest
from py.services import service_registry
from py.services import settings_manager as settings_manager_module
from py.services.settings_manager import SettingsManager
from py.utils import settings_paths
@@ -163,6 +164,68 @@ def _create_manager_with_settings(tmp_path, monkeypatch, initial_settings, *, sa
return mgr
def _setup_storage_paths(tmp_path, monkeypatch):
project_root = tmp_path / "repo"
project_root.mkdir(parents=True, exist_ok=True)
user_dir = tmp_path / "user_config"
user_dir.mkdir(parents=True, exist_ok=True)
user_settings_path = user_dir / "settings.json"
monkeypatch.setattr(
"py.services.settings_manager.ensure_settings_file",
lambda logger=None: str(user_settings_path),
)
monkeypatch.setattr(
settings_manager_module,
"user_config_dir",
lambda *args, **kwargs: str(user_dir),
)
monkeypatch.setattr(settings_paths, "get_project_root", lambda: str(project_root))
return project_root, user_dir, user_settings_path
def _populate_cache(root_dir, marker_name, db_text):
cache_dir = root_dir / "model_cache"
cache_dir.mkdir(exist_ok=True)
marker_file = cache_dir / marker_name
marker_file.write_text(marker_name, encoding="utf-8")
(root_dir / "model_cache.sqlite").write_text(db_text, encoding="utf-8")
def test_switch_to_portable_mode_copies_cache(tmp_path, monkeypatch):
project_root, user_dir, user_settings = _setup_storage_paths(tmp_path, monkeypatch)
_populate_cache(user_dir, "user_marker.txt", "user_db")
manager = SettingsManager()
manager.set("use_portable_settings", True)
assert manager.settings_file == str(project_root / "settings.json")
marker_copy = project_root / "model_cache" / "user_marker.txt"
assert marker_copy.read_text(encoding="utf-8") == "user_marker.txt"
assert (project_root / "model_cache.sqlite").read_text(encoding="utf-8") == "user_db"
assert user_settings.exists()
def test_switching_back_to_user_config_moves_cache(tmp_path, monkeypatch):
project_root, user_dir, user_settings = _setup_storage_paths(tmp_path, monkeypatch)
_populate_cache(user_dir, "user_marker.txt", "user_db")
manager = SettingsManager()
manager.set("use_portable_settings", True)
project_cache_dir = project_root / "model_cache"
project_cache_dir.mkdir(exist_ok=True)
(project_cache_dir / "project_marker.txt").write_text("project_marker", encoding="utf-8")
(project_root / "model_cache.sqlite").write_text("project_db", encoding="utf-8")
manager.set("use_portable_settings", False)
assert manager.settings_file == str(user_settings)
assert (user_dir / "model_cache" / "project_marker.txt").read_text(encoding="utf-8") == "project_marker"
assert (user_dir / "model_cache.sqlite").read_text(encoding="utf-8") == "project_db"
def test_download_path_template_parses_json_string(manager):
templates = {"lora": "{author}", "checkpoint": "{author}", "embedding": "{author}"}
manager.settings["download_path_templates"] = json.dumps(templates)
@@ -212,8 +275,7 @@ def test_model_name_display_setting_notifies_scanners(tmp_path, monkeypatch):
manager = _create_manager_with_settings(tmp_path, monkeypatch, initial)
loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever, daemon=True)
thread.start()
loop._thread_id = 1
class DummyScanner:
def __init__(self):
@@ -227,12 +289,16 @@ def test_model_name_display_setting_notifies_scanners(tmp_path, monkeypatch):
dispatched_loops = []
futures = []
original_run_coroutine_threadsafe = asyncio.run_coroutine_threadsafe
def tracking_run_coroutine_threadsafe(coro, target_loop):
dispatched_loops.append(target_loop)
future = original_run_coroutine_threadsafe(coro, target_loop)
future = Future()
futures.append(future)
try:
result = asyncio.run(coro)
except Exception as exc:
future.set_exception(exc)
else:
future.set_result(result)
return future
def fake_get_service_sync(cls, name):
@@ -254,8 +320,7 @@ def test_model_name_display_setting_notifies_scanners(tmp_path, monkeypatch):
assert dummy_scanner.calls == ["file_name"]
assert dispatched_loops == [dummy_scanner.loop]
finally:
loop.call_soon_threadsafe(loop.stop)
thread.join(timeout=1)
loop._thread_id = None
loop.close()