feat: add WSL and Docker support for file location opening

- Add WSL detection and Windows path conversion using wslpath
- Add Docker/Kubernetes detection via /.dockerenv and /proc/1/cgroup
- Implement clipboard fallback for containerized environments
- Update open_file_location handler to detect WSL/Docker before POSIX
- Update open_settings_location handler with same detection logic
- Add clipboard API integration with graceful fallback in frontend
- Add translations for clipboard feature across all 10 languages
- Add unit tests for _is_wsl(), _is_docker(), and _wsl_to_windows_path()

Fixes file manager opening failures in WSL and Docker environments.
This commit is contained in:
Will Miao
2026-01-14 15:49:35 +08:00
parent 73f2a34d08
commit 4951ff358e
14 changed files with 680 additions and 130 deletions

View File

@@ -233,7 +233,9 @@
"label": "Einstellungsordner öffnen", "label": "Einstellungsordner öffnen",
"tooltip": "Den Ordner mit der settings.json öffnen", "tooltip": "Den Ordner mit der settings.json öffnen",
"success": "Einstellungsordner geöffnet", "success": "Einstellungsordner geöffnet",
"failed": "Einstellungsordner konnte nicht geöffnet werden" "failed": "Einstellungsordner konnte nicht geöffnet werden",
"copied": "Einstellungspfad in die Zwischenablage kopiert: {{path}}",
"clipboardFallback": "Einstellungspfad: {{path}}"
}, },
"sections": { "sections": {
"contentFiltering": "Inhaltsfilterung", "contentFiltering": "Inhaltsfilterung",
@@ -895,7 +897,9 @@
}, },
"openFileLocation": { "openFileLocation": {
"success": "Dateispeicherort erfolgreich geöffnet", "success": "Dateispeicherort erfolgreich geöffnet",
"failed": "Fehler beim Öffnen des Dateispeicherorts" "failed": "Fehler beim Öffnen des Dateispeicherorts",
"copied": "Pfad in die Zwischenablage kopiert: {{path}}",
"clipboardFallback": "Pfad: {{path}}"
}, },
"metadata": { "metadata": {
"version": "Version", "version": "Version",
@@ -1531,4 +1535,4 @@
"learnMore": "LM Civitai Extension Tutorial" "learnMore": "LM Civitai Extension Tutorial"
} }
} }
} }

View File

@@ -231,9 +231,11 @@
"civitaiApiKeyHelp": "Used for authentication when downloading models from Civitai", "civitaiApiKeyHelp": "Used for authentication when downloading models from Civitai",
"openSettingsFileLocation": { "openSettingsFileLocation": {
"label": "Open settings folder", "label": "Open settings folder",
"tooltip": "Open the folder containing settings.json", "tooltip": "Open folder containing settings.json",
"success": "Opened settings.json folder", "success": "Opened settings.json folder",
"failed": "Failed to open settings.json folder" "failed": "Failed to open settings.json folder",
"copied": "Settings path copied to clipboard: {{path}}",
"clipboardFallback": "Settings path: {{path}}"
}, },
"sections": { "sections": {
"contentFiltering": "Content Filtering", "contentFiltering": "Content Filtering",
@@ -895,7 +897,9 @@
}, },
"openFileLocation": { "openFileLocation": {
"success": "File location opened successfully", "success": "File location opened successfully",
"failed": "Failed to open file location" "failed": "Failed to open file location",
"copied": "Path copied to clipboard: {{path}}",
"clipboardFallback": "Path: {{path}}"
}, },
"metadata": { "metadata": {
"version": "Version", "version": "Version",

View File

@@ -233,7 +233,9 @@
"label": "Abrir carpeta de ajustes", "label": "Abrir carpeta de ajustes",
"tooltip": "Abrir la carpeta que contiene settings.json", "tooltip": "Abrir la carpeta que contiene settings.json",
"success": "Carpeta de settings.json abierta", "success": "Carpeta de settings.json abierta",
"failed": "No se pudo abrir la carpeta de settings.json" "failed": "No se pudo abrir la carpeta de settings.json",
"copied": "Ruta de configuración copiada al portapapeles: {{path}}",
"clipboardFallback": "Ruta de configuración: {{path}}"
}, },
"sections": { "sections": {
"contentFiltering": "Filtrado de contenido", "contentFiltering": "Filtrado de contenido",
@@ -895,7 +897,9 @@
}, },
"openFileLocation": { "openFileLocation": {
"success": "Ubicación del archivo abierta exitosamente", "success": "Ubicación del archivo abierta exitosamente",
"failed": "Error al abrir la ubicación del archivo" "failed": "Error al abrir la ubicación del archivo",
"copied": "Ruta copiada al portapapeles: {{path}}",
"clipboardFallback": "Ruta: {{path}}"
}, },
"metadata": { "metadata": {
"version": "Versión", "version": "Versión",
@@ -1531,4 +1535,4 @@
"learnMore": "LM Civitai Extension Tutorial" "learnMore": "LM Civitai Extension Tutorial"
} }
} }
} }

View File

@@ -233,7 +233,9 @@
"label": "Ouvrir le dossier des paramètres", "label": "Ouvrir le dossier des paramètres",
"tooltip": "Ouvrir le dossier contenant settings.json", "tooltip": "Ouvrir le dossier contenant settings.json",
"success": "Dossier settings.json ouvert", "success": "Dossier settings.json ouvert",
"failed": "Impossible d'ouvrir le dossier settings.json" "failed": "Impossible d'ouvrir le dossier settings.json",
"copied": "Chemin des paramètres copié dans le presse-papiers: {{path}}",
"clipboardFallback": "Chemin des paramètres: {{path}}"
}, },
"sections": { "sections": {
"contentFiltering": "Filtrage du contenu", "contentFiltering": "Filtrage du contenu",
@@ -895,7 +897,9 @@
}, },
"openFileLocation": { "openFileLocation": {
"success": "Emplacement du fichier ouvert avec succès", "success": "Emplacement du fichier ouvert avec succès",
"failed": "Échec de l'ouverture de l'emplacement du fichier" "failed": "Échec de l'ouverture de l'emplacement du fichier",
"copied": "Chemin copié dans le presse-papiers: {{path}}",
"clipboardFallback": "Chemin: {{path}}"
}, },
"metadata": { "metadata": {
"version": "Version", "version": "Version",
@@ -1531,4 +1535,4 @@
"learnMore": "LM Civitai Extension Tutorial" "learnMore": "LM Civitai Extension Tutorial"
} }
} }
} }

View File

@@ -233,7 +233,9 @@
"label": "פתח תיקיית הגדרות", "label": "פתח תיקיית הגדרות",
"tooltip": "פתח את התיקייה שמכילה את settings.json", "tooltip": "פתח את התיקייה שמכילה את settings.json",
"success": "תיקיית settings.json נפתחה", "success": "תיקיית settings.json נפתחה",
"failed": "לא ניתן לפתוח את תיקיית settings.json" "failed": "לא ניתן לפתוח את תיקיית settings.json",
"copied": "נתיב ההגדרות הועתק ללוח העריכה: {{path}}",
"clipboardFallback": "נתיב ההגדרות: {{path}}"
}, },
"sections": { "sections": {
"contentFiltering": "סינון תוכן", "contentFiltering": "סינון תוכן",
@@ -895,7 +897,9 @@
}, },
"openFileLocation": { "openFileLocation": {
"success": "מיקום הקובץ נפתח בהצלחה", "success": "מיקום הקובץ נפתח בהצלחה",
"failed": "פתיחת מיקום הקובץ נכשלה" "failed": "פתיחת מיקום הקובץ נכשלה",
"copied": "הנתיב הועתק ללוח העריכה: {{path}}",
"clipboardFallback": "נתיב: {{path}}"
}, },
"metadata": { "metadata": {
"version": "גרסה", "version": "גרסה",
@@ -1531,4 +1535,4 @@
"learnMore": "LM Civitai Extension Tutorial" "learnMore": "LM Civitai Extension Tutorial"
} }
} }
} }

View File

@@ -233,7 +233,9 @@
"label": "設定フォルダーを開く", "label": "設定フォルダーを開く",
"tooltip": "settings.json を含むフォルダーを開きます", "tooltip": "settings.json を含むフォルダーを開きます",
"success": "settings.json フォルダーを開きました", "success": "settings.json フォルダーを開きました",
"failed": "settings.json フォルダーを開けませんでした" "failed": "settings.json フォルダーを開けませんでした",
"copied": "設定パスをクリップボードにコピーしました: {{path}}",
"clipboardFallback": "設定パス: {{path}}"
}, },
"sections": { "sections": {
"contentFiltering": "コンテンツフィルタリング", "contentFiltering": "コンテンツフィルタリング",
@@ -895,7 +897,9 @@
}, },
"openFileLocation": { "openFileLocation": {
"success": "ファイルの場所を正常に開きました", "success": "ファイルの場所を正常に開きました",
"failed": "ファイルの場所を開くのに失敗しました" "failed": "ファイルの場所を開くのに失敗しました",
"copied": "パスをクリップボードにコピーしました: {{path}}",
"clipboardFallback": "パス: {{path}}"
}, },
"metadata": { "metadata": {
"version": "バージョン", "version": "バージョン",
@@ -1531,4 +1535,4 @@
"learnMore": "LM Civitai Extension Tutorial" "learnMore": "LM Civitai Extension Tutorial"
} }
} }
} }

View File

@@ -233,7 +233,9 @@
"label": "설정 폴더 열기", "label": "설정 폴더 열기",
"tooltip": "settings.json이 있는 폴더를 엽니다", "tooltip": "settings.json이 있는 폴더를 엽니다",
"success": "settings.json 폴더를 열었습니다", "success": "settings.json 폴더를 열었습니다",
"failed": "settings.json 폴더를 열지 못했습니다" "failed": "settings.json 폴더를 열지 못했습니다",
"copied": "설정 경로가 클립보드에 복사되었습니다: {{path}}",
"clipboardFallback": "설정 경로: {{path}}"
}, },
"sections": { "sections": {
"contentFiltering": "콘텐츠 필터링", "contentFiltering": "콘텐츠 필터링",
@@ -895,7 +897,9 @@
}, },
"openFileLocation": { "openFileLocation": {
"success": "파일 위치가 성공적으로 열렸습니다", "success": "파일 위치가 성공적으로 열렸습니다",
"failed": "파일 위치 열기에 실패했습니다" "failed": "파일 위치 열기에 실패했습니다",
"copied": "경로가 클립보드에 복사되었습니다: {{path}}",
"clipboardFallback": "경로: {{path}}"
}, },
"metadata": { "metadata": {
"version": "버전", "version": "버전",
@@ -1531,4 +1535,4 @@
"learnMore": "LM Civitai Extension Tutorial" "learnMore": "LM Civitai Extension Tutorial"
} }
} }
} }

View File

@@ -233,7 +233,9 @@
"label": "Открыть папку настроек", "label": "Открыть папку настроек",
"tooltip": "Открыть папку, содержащую settings.json", "tooltip": "Открыть папку, содержащую settings.json",
"success": "Папка settings.json открыта", "success": "Папка settings.json открыта",
"failed": "Не удалось открыть папку settings.json" "failed": "Не удалось открыть папку settings.json",
"copied": "Путь настроек скопирован в буфер обмена: {{path}}",
"clipboardFallback": "Путь настроек: {{path}}"
}, },
"sections": { "sections": {
"contentFiltering": "Фильтрация контента", "contentFiltering": "Фильтрация контента",
@@ -895,7 +897,9 @@
}, },
"openFileLocation": { "openFileLocation": {
"success": "Расположение файла успешно открыто", "success": "Расположение файла успешно открыто",
"failed": "Не удалось открыть расположение файла" "failed": "Не удалось открыть расположение файла",
"copied": "Путь скопирован в буфер обмена: {{path}}",
"clipboardFallback": "Путь: {{path}}"
}, },
"metadata": { "metadata": {
"version": "Версия", "version": "Версия",
@@ -1531,4 +1535,4 @@
"learnMore": "LM Civitai Extension Tutorial" "learnMore": "LM Civitai Extension Tutorial"
} }
} }
} }

View File

@@ -233,7 +233,9 @@
"label": "打开设置文件夹", "label": "打开设置文件夹",
"tooltip": "打开包含 settings.json 的文件夹", "tooltip": "打开包含 settings.json 的文件夹",
"success": "已打开 settings.json 文件夹", "success": "已打开 settings.json 文件夹",
"failed": "无法打开 settings.json 文件夹" "failed": "无法打开 settings.json 文件夹",
"copied": "设置路径已复制到剪贴板:{{path}}",
"clipboardFallback": "设置路径:{{path}}"
}, },
"sections": { "sections": {
"contentFiltering": "内容过滤", "contentFiltering": "内容过滤",
@@ -895,7 +897,9 @@
}, },
"openFileLocation": { "openFileLocation": {
"success": "文件位置已成功打开", "success": "文件位置已成功打开",
"failed": "打开文件位置失败" "failed": "打开文件位置失败",
"copied": "路径已复制到剪贴板:{{path}}",
"clipboardFallback": "路径:{{path}}"
}, },
"metadata": { "metadata": {
"version": "版本", "version": "版本",

View File

@@ -233,7 +233,9 @@
"label": "開啟設定資料夾", "label": "開啟設定資料夾",
"tooltip": "開啟包含 settings.json 的資料夾", "tooltip": "開啟包含 settings.json 的資料夾",
"success": "已開啟 settings.json 資料夾", "success": "已開啟 settings.json 資料夾",
"failed": "無法開啟 settings.json 資料夾" "failed": "無法開啟 settings.json 資料夾",
"copied": "設定路徑已複製到剪貼簿:{{path}}",
"clipboardFallback": "設定路徑:{{path}}"
}, },
"sections": { "sections": {
"contentFiltering": "內容過濾", "contentFiltering": "內容過濾",
@@ -895,7 +897,9 @@
}, },
"openFileLocation": { "openFileLocation": {
"success": "檔案位置已成功開啟", "success": "檔案位置已成功開啟",
"failed": "開啟檔案位置失敗" "failed": "開啟檔案位置失敗",
"copied": "路徑已複製到剪貼簿:{{path}}",
"clipboardFallback": "路徑:{{path}}"
}, },
"metadata": { "metadata": {
"version": "版本", "version": "版本",
@@ -1531,4 +1535,4 @@
"learnMore": "LM Civitai Extension Tutorial" "learnMore": "LM Civitai Extension Tutorial"
} }
} }
} }

View File

@@ -43,12 +43,55 @@ from ...utils.usage_stats import UsageStats
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _is_wsl() -> bool:
"""Check if running in WSL environment."""
try:
with open("/proc/version", "r") as f:
version_info = f.read().lower()
return "microsoft" in version_info or "wsl" in version_info
except (OSError, IOError):
return False
def _is_docker() -> bool:
"""Check if running in Docker container."""
dockerenv_exists = os.path.exists("/.dockerenv")
if dockerenv_exists:
return True
try:
with open("/proc/1/cgroup", "r") as f:
cgroup_content = f.read()
return (
"docker" in cgroup_content.lower()
or "kubepods" in cgroup_content.lower()
)
except (OSError, IOError):
return False
def _wsl_to_windows_path(wsl_path: str) -> str | None:
"""Convert WSL path to Windows path using wslpath."""
try:
result = subprocess.run(
["wslpath", "-w", wsl_path],
capture_output=True,
text=True,
check=True,
)
return result.stdout.strip()
except (subprocess.CalledProcessError, FileNotFoundError, OSError):
return None
class PromptServerProtocol(Protocol): class PromptServerProtocol(Protocol):
"""Subset of PromptServer used by the handlers.""" """Subset of PromptServer used by the handlers."""
instance: "PromptServerProtocol" instance: "PromptServerProtocol"
def send_sync(self, event: str, payload: dict) -> None: # pragma: no cover - protocol def send_sync(
self, event: str, payload: dict
) -> None: # pragma: no cover - protocol
... ...
@@ -63,7 +106,9 @@ class UsageStatsFactory(Protocol):
class MetadataProviderProtocol(Protocol): class MetadataProviderProtocol(Protocol):
async def get_model_versions(self, model_id: int) -> dict | None: # pragma: no cover - protocol async def get_model_versions(
self, model_id: int
) -> dict | None: # pragma: no cover - protocol
... ...
@@ -109,7 +154,11 @@ class NodeRegistry:
raw_widget_names: list | None = node.get("widget_names") raw_widget_names: list | None = node.get("widget_names")
if not isinstance(raw_widget_names, list): if not isinstance(raw_widget_names, list):
capability_widget_names = capabilities.get("widget_names") capability_widget_names = capabilities.get("widget_names")
raw_widget_names = capability_widget_names if isinstance(capability_widget_names, list) else None raw_widget_names = (
capability_widget_names
if isinstance(capability_widget_names, list)
else None
)
widget_names: list[str] = [] widget_names: list[str] = []
if isinstance(raw_widget_names, list): if isinstance(raw_widget_names, list):
@@ -205,14 +254,25 @@ class SettingsHandler:
"auto_organize_exclusions", "auto_organize_exclusions",
) )
_PROXY_KEYS = {"proxy_enabled", "proxy_host", "proxy_port", "proxy_username", "proxy_password", "proxy_type"} _PROXY_KEYS = {
"proxy_enabled",
"proxy_host",
"proxy_port",
"proxy_username",
"proxy_password",
"proxy_type",
}
def __init__( def __init__(
self, self,
*, *,
settings_service=None, settings_service=None,
metadata_provider_updater: Callable[[], Awaitable[None]] = update_metadata_providers, metadata_provider_updater: Callable[
downloader_factory: Callable[[], Awaitable[DownloaderProtocol]] = get_downloader, [], Awaitable[None]
] = update_metadata_providers,
downloader_factory: Callable[
[], Awaitable[DownloaderProtocol]
] = get_downloader,
) -> None: ) -> None:
self._settings = settings_service or get_settings_manager() self._settings = settings_service or get_settings_manager()
self._metadata_provider_updater = metadata_provider_updater self._metadata_provider_updater = metadata_provider_updater
@@ -248,11 +308,13 @@ class SettingsHandler:
response_data["settings_file"] = settings_file response_data["settings_file"] = settings_file
messages_getter = getattr(self._settings, "get_startup_messages", None) messages_getter = getattr(self._settings, "get_startup_messages", None)
messages = list(messages_getter()) if callable(messages_getter) else [] messages = list(messages_getter()) if callable(messages_getter) else []
return web.json_response({ return web.json_response(
"success": True, {
"settings": response_data, "success": True,
"messages": messages, "settings": response_data,
}) "messages": messages,
}
)
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Error getting settings: %s", exc, exc_info=True) logger.error("Error getting settings: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500) return web.json_response({"success": False, "error": str(exc)}, status=500)
@@ -271,8 +333,12 @@ class SettingsHandler:
try: try:
data = await request.json() data = await request.json()
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Error parsing activate library request: %s", exc, exc_info=True) logger.error(
return web.json_response({"success": False, "error": "Invalid JSON payload"}, status=400) "Error parsing activate library request: %s", exc, exc_info=True
)
return web.json_response(
{"success": False, "error": "Invalid JSON payload"}, status=400
)
library_name = data.get("library") or data.get("library_name") library_name = data.get("library") or data.get("library_name")
if not isinstance(library_name, str) or not library_name.strip(): if not isinstance(library_name, str) or not library_name.strip():
@@ -297,7 +363,9 @@ class SettingsHandler:
logger.debug("Attempted to activate unknown library '%s'", library_name) logger.debug("Attempted to activate unknown library '%s'", library_name)
return web.json_response({"success": False, "error": str(exc)}, status=404) return web.json_response({"success": False, "error": str(exc)}, status=404)
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Error activating library '%s': %s", library_name, exc, exc_info=True) logger.error(
"Error activating library '%s': %s", library_name, exc, exc_info=True
)
return web.json_response({"success": False, "error": str(exc)}, status=500) return web.json_response({"success": False, "error": str(exc)}, status=500)
async def update_settings(self, request: web.Request) -> web.Response: async def update_settings(self, request: web.Request) -> web.Response:
@@ -312,9 +380,14 @@ class SettingsHandler:
if key == "example_images_path" and value: if key == "example_images_path" and value:
validation_error = self._validate_example_images_path(value) validation_error = self._validate_example_images_path(value)
if validation_error: if validation_error:
return web.json_response({"success": False, "error": validation_error}) return web.json_response(
{"success": False, "error": validation_error}
)
if value == "__DELETE__" and key in ("proxy_username", "proxy_password"): if value == "__DELETE__" and key in (
"proxy_username",
"proxy_password",
):
self._settings.delete(key) self._settings.delete(key)
else: else:
self._settings.set(key, value) self._settings.set(key, value)
@@ -356,7 +429,9 @@ class UsageStatsHandler:
data = await request.json() data = await request.json()
prompt_id = data.get("prompt_id") prompt_id = data.get("prompt_id")
if not prompt_id: if not prompt_id:
return web.json_response({"success": False, "error": "Missing prompt_id"}, status=400) return web.json_response(
{"success": False, "error": "Missing prompt_id"}, status=400
)
usage_stats = self._usage_stats_factory() usage_stats = self._usage_stats_factory()
await usage_stats.process_execution(prompt_id) await usage_stats.process_execution(prompt_id)
return web.json_response({"success": True}) return web.json_response({"success": True})
@@ -387,18 +462,24 @@ class LoraCodeHandler:
mode = data.get("mode", "append") mode = data.get("mode", "append")
if not lora_code: if not lora_code:
return web.json_response({"success": False, "error": "Missing lora_code parameter"}, status=400) return web.json_response(
{"success": False, "error": "Missing lora_code parameter"},
status=400,
)
results = [] results = []
if node_ids is None: if node_ids is None:
try: try:
self._prompt_server.instance.send_sync( self._prompt_server.instance.send_sync(
"lora_code_update", {"id": -1, "lora_code": lora_code, "mode": mode} "lora_code_update",
{"id": -1, "lora_code": lora_code, "mode": mode},
) )
results.append({"node_id": "broadcast", "success": True}) results.append({"node_id": "broadcast", "success": True})
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Error broadcasting lora code: %s", exc) logger.error("Error broadcasting lora code: %s", exc)
results.append({"node_id": "broadcast", "success": False, "error": str(exc)}) results.append(
{"node_id": "broadcast", "success": False, "error": str(exc)}
)
else: else:
for entry in node_ids: for entry in node_ids:
node_identifier = entry node_identifier = entry
@@ -471,11 +552,19 @@ class TrainedWordsHandler:
try: try:
file_path = request.query.get("file_path") file_path = request.query.get("file_path")
if not file_path: if not file_path:
return web.json_response({"success": False, "error": "Missing file_path parameter"}, status=400) return web.json_response(
{"success": False, "error": "Missing file_path parameter"},
status=400,
)
if not os.path.exists(file_path): if not os.path.exists(file_path):
return web.json_response({"success": False, "error": "File not found"}, status=404) return web.json_response(
{"success": False, "error": "File not found"}, status=404
)
if not file_path.endswith(".safetensors"): if not file_path.endswith(".safetensors"):
return web.json_response({"success": False, "error": "File must be a safetensors file"}, status=400) return web.json_response(
{"success": False, "error": "File must be a safetensors file"},
status=400,
)
trained_words, class_tokens = await extract_trained_words(file_path) trained_words, class_tokens = await extract_trained_words(file_path)
return web.json_response( return web.json_response(
@@ -495,10 +584,15 @@ class ModelExampleFilesHandler:
try: try:
model_path = request.query.get("model_path") model_path = request.query.get("model_path")
if not model_path: if not model_path:
return web.json_response({"success": False, "error": "Missing model_path parameter"}, status=400) return web.json_response(
{"success": False, "error": "Missing model_path parameter"},
status=400,
)
model_dir = os.path.dirname(model_path) model_dir = os.path.dirname(model_path)
if not os.path.exists(model_dir): if not os.path.exists(model_dir):
return web.json_response({"success": False, "error": "Model directory not found"}, status=404) return web.json_response(
{"success": False, "error": "Model directory not found"}, status=404
)
base_name = os.path.splitext(os.path.basename(model_path))[0] base_name = os.path.splitext(os.path.basename(model_path))[0]
files = [] files = []
@@ -510,7 +604,10 @@ class ModelExampleFilesHandler:
if not os.path.isfile(file_full_path): if not os.path.isfile(file_full_path):
continue continue
file_ext = os.path.splitext(file)[1].lower() file_ext = os.path.splitext(file)[1].lower()
if file_ext not in SUPPORTED_MEDIA_EXTENSIONS["images"] and file_ext not in SUPPORTED_MEDIA_EXTENSIONS["videos"]: if (
file_ext not in SUPPORTED_MEDIA_EXTENSIONS["images"]
and file_ext not in SUPPORTED_MEDIA_EXTENSIONS["videos"]
):
continue continue
try: try:
index = int(file[len(pattern) :].split(".")[0]) index = int(file[len(pattern) :].split(".")[0])
@@ -545,7 +642,13 @@ class ServiceRegistryAdapter:
class ModelLibraryHandler: class ModelLibraryHandler:
def __init__(self, service_registry: ServiceRegistryAdapter, metadata_provider_factory: Callable[[], Awaitable[MetadataProviderProtocol | None]]) -> None: def __init__(
self,
service_registry: ServiceRegistryAdapter,
metadata_provider_factory: Callable[
[], Awaitable[MetadataProviderProtocol | None]
],
) -> None:
self._service_registry = service_registry self._service_registry = service_registry
self._metadata_provider_factory = metadata_provider_factory self._metadata_provider_factory = metadata_provider_factory
@@ -554,11 +657,17 @@ class ModelLibraryHandler:
model_id_str = request.query.get("modelId") model_id_str = request.query.get("modelId")
model_version_id_str = request.query.get("modelVersionId") model_version_id_str = request.query.get("modelVersionId")
if not model_id_str: if not model_id_str:
return web.json_response({"success": False, "error": "Missing required parameter: modelId"}, status=400) return web.json_response(
{"success": False, "error": "Missing required parameter: modelId"},
status=400,
)
try: try:
model_id = int(model_id_str) model_id = int(model_id_str)
except ValueError: except ValueError:
return web.json_response({"success": False, "error": "Parameter modelId must be an integer"}, status=400) return web.json_response(
{"success": False, "error": "Parameter modelId must be an integer"},
status=400,
)
lora_scanner = await self._service_registry.get_lora_scanner() lora_scanner = await self._service_registry.get_lora_scanner()
checkpoint_scanner = await self._service_registry.get_checkpoint_scanner() checkpoint_scanner = await self._service_registry.get_checkpoint_scanner()
@@ -568,29 +677,55 @@ class ModelLibraryHandler:
try: try:
model_version_id = int(model_version_id_str) model_version_id = int(model_version_id_str)
except ValueError: except ValueError:
return web.json_response({"success": False, "error": "Parameter modelVersionId must be an integer"}, status=400) return web.json_response(
{
"success": False,
"error": "Parameter modelVersionId must be an integer",
},
status=400,
)
exists = False exists = False
model_type = None model_type = None
if await lora_scanner.check_model_version_exists(model_version_id): if await lora_scanner.check_model_version_exists(model_version_id):
exists = True exists = True
model_type = "lora" model_type = "lora"
elif checkpoint_scanner and await checkpoint_scanner.check_model_version_exists(model_version_id): elif (
checkpoint_scanner
and await checkpoint_scanner.check_model_version_exists(
model_version_id
)
):
exists = True exists = True
model_type = "checkpoint" model_type = "checkpoint"
elif embedding_scanner and await embedding_scanner.check_model_version_exists(model_version_id): elif (
embedding_scanner
and await embedding_scanner.check_model_version_exists(
model_version_id
)
):
exists = True exists = True
model_type = "embedding" model_type = "embedding"
return web.json_response({"success": True, "exists": exists, "modelType": model_type if exists else None}) return web.json_response(
{
"success": True,
"exists": exists,
"modelType": model_type if exists else None,
}
)
lora_versions = await lora_scanner.get_model_versions_by_id(model_id) lora_versions = await lora_scanner.get_model_versions_by_id(model_id)
checkpoint_versions = [] checkpoint_versions = []
embedding_versions = [] embedding_versions = []
if not lora_versions and checkpoint_scanner: if not lora_versions and checkpoint_scanner:
checkpoint_versions = await checkpoint_scanner.get_model_versions_by_id(model_id) checkpoint_versions = await checkpoint_scanner.get_model_versions_by_id(
model_id
)
if not lora_versions and not checkpoint_versions and embedding_scanner: if not lora_versions and not checkpoint_versions and embedding_scanner:
embedding_versions = await embedding_scanner.get_model_versions_by_id(model_id) embedding_versions = await embedding_scanner.get_model_versions_by_id(
model_id
)
model_type = None model_type = None
versions = [] versions = []
@@ -604,7 +739,9 @@ class ModelLibraryHandler:
model_type = "embedding" model_type = "embedding"
versions = embedding_versions versions = embedding_versions
return web.json_response({"success": True, "modelType": model_type, "versions": versions}) return web.json_response(
{"success": True, "modelType": model_type, "versions": versions}
)
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Failed to check model existence: %s", exc, exc_info=True) logger.error("Failed to check model existence: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500) return web.json_response({"success": False, "error": str(exc)}, status=500)
@@ -613,22 +750,35 @@ class ModelLibraryHandler:
try: try:
model_id_str = request.query.get("modelId") model_id_str = request.query.get("modelId")
if not model_id_str: if not model_id_str:
return web.json_response({"success": False, "error": "Missing required parameter: modelId"}, status=400) return web.json_response(
{"success": False, "error": "Missing required parameter: modelId"},
status=400,
)
try: try:
model_id = int(model_id_str) model_id = int(model_id_str)
except ValueError: except ValueError:
return web.json_response({"success": False, "error": "Parameter modelId must be an integer"}, status=400) return web.json_response(
{"success": False, "error": "Parameter modelId must be an integer"},
status=400,
)
metadata_provider = await self._metadata_provider_factory() metadata_provider = await self._metadata_provider_factory()
if not metadata_provider: if not metadata_provider:
return web.json_response({"success": False, "error": "Metadata provider not available"}, status=503) return web.json_response(
{"success": False, "error": "Metadata provider not available"},
status=503,
)
try: try:
response = await metadata_provider.get_model_versions(model_id) response = await metadata_provider.get_model_versions(model_id)
except ResourceNotFoundError: except ResourceNotFoundError:
return web.json_response({"success": False, "error": "Model not found"}, status=404) return web.json_response(
{"success": False, "error": "Model not found"}, status=404
)
if not response or not response.get("modelVersions"): if not response or not response.get("modelVersions"):
return web.json_response({"success": False, "error": "Model not found"}, status=404) return web.json_response(
{"success": False, "error": "Model not found"}, status=404
)
versions = response.get("modelVersions", []) versions = response.get("modelVersions", [])
model_name = response.get("name", "") model_name = response.get("name", "")
@@ -646,10 +796,22 @@ class ModelLibraryHandler:
scanner = await self._service_registry.get_embedding_scanner() scanner = await self._service_registry.get_embedding_scanner()
normalized_type = "embedding" normalized_type = "embedding"
else: else:
return web.json_response({"success": False, "error": f'Model type "{model_type}" is not supported'}, status=400) return web.json_response(
{
"success": False,
"error": f'Model type "{model_type}" is not supported',
},
status=400,
)
if not scanner: if not scanner:
return web.json_response({"success": False, "error": f'Scanner for type "{normalized_type}" is not available'}, status=503) return web.json_response(
{
"success": False,
"error": f'Scanner for type "{normalized_type}" is not available',
},
status=503,
)
local_versions = await scanner.get_model_versions_by_id(model_id) local_versions = await scanner.get_model_versions_by_id(model_id)
local_version_ids = {version["versionId"] for version in local_versions} local_version_ids = {version["versionId"] for version in local_versions}
@@ -661,7 +823,9 @@ class ModelLibraryHandler:
{ {
"id": version_id, "id": version_id,
"name": version.get("name", ""), "name": version.get("name", ""),
"thumbnailUrl": version.get("images")[0]["url"] if version.get("images") else None, "thumbnailUrl": version.get("images")[0]["url"]
if version.get("images")
else None,
"inLibrary": version_id in local_version_ids, "inLibrary": version_id in local_version_ids,
} }
) )
@@ -683,19 +847,34 @@ class ModelLibraryHandler:
try: try:
username = request.query.get("username") username = request.query.get("username")
if not username: if not username:
return web.json_response({"success": False, "error": "Missing required parameter: username"}, status=400) return web.json_response(
{"success": False, "error": "Missing required parameter: username"},
status=400,
)
metadata_provider = await self._metadata_provider_factory() metadata_provider = await self._metadata_provider_factory()
if not metadata_provider: if not metadata_provider:
return web.json_response({"success": False, "error": "Metadata provider not available"}, status=503) return web.json_response(
{"success": False, "error": "Metadata provider not available"},
status=503,
)
try: try:
models = await metadata_provider.get_user_models(username) models = await metadata_provider.get_user_models(username)
except NotImplementedError: except NotImplementedError:
return web.json_response({"success": False, "error": "Metadata provider does not support user model queries"}, status=501) return web.json_response(
{
"success": False,
"error": "Metadata provider does not support user model queries",
},
status=501,
)
if models is None: if models is None:
return web.json_response({"success": False, "error": "Failed to fetch user models"}, status=502) return web.json_response(
{"success": False, "error": "Failed to fetch user models"},
status=502,
)
if not isinstance(models, list): if not isinstance(models, list):
models = [] models = []
@@ -704,7 +883,9 @@ class ModelLibraryHandler:
checkpoint_scanner = await self._service_registry.get_checkpoint_scanner() checkpoint_scanner = await self._service_registry.get_checkpoint_scanner()
embedding_scanner = await self._service_registry.get_embedding_scanner() embedding_scanner = await self._service_registry.get_embedding_scanner()
normalized_allowed_types = {model_type.lower() for model_type in CIVITAI_USER_MODEL_TYPES} normalized_allowed_types = {
model_type.lower() for model_type in CIVITAI_USER_MODEL_TYPES
}
lora_type_aliases = {model_type.lower() for model_type in VALID_LORA_TYPES} lora_type_aliases = {model_type.lower() for model_type in VALID_LORA_TYPES}
type_scanner_map: Dict[str, object | None] = { type_scanner_map: Dict[str, object | None] = {
@@ -724,7 +905,13 @@ class ModelLibraryHandler:
scanner = type_scanner_map.get(model_type) scanner = type_scanner_map.get(model_type)
if scanner is None: if scanner is None:
return web.json_response({"success": False, "error": f'Scanner for type "{model_type}" is not available'}, status=503) return web.json_response(
{
"success": False,
"error": f'Scanner for type "{model_type}" is not available',
},
status=503,
)
tags_value = model.get("tags") tags_value = model.get("tags")
tags = tags_value if isinstance(tags_value, list) else [] tags = tags_value if isinstance(tags_value, list) else []
@@ -759,7 +946,9 @@ class ModelLibraryHandler:
rewritten_url, _ = rewrite_preview_url(raw_url, media_type) rewritten_url, _ = rewrite_preview_url(raw_url, media_type)
thumbnail_url = rewritten_url thumbnail_url = rewritten_url
in_library = await scanner.check_model_version_exists(version_id_int) in_library = await scanner.check_model_version_exists(
version_id_int
)
versions.append( versions.append(
{ {
@@ -775,7 +964,9 @@ class ModelLibraryHandler:
} }
) )
return web.json_response({"success": True, "username": username, "versions": versions}) return web.json_response(
{"success": True, "username": username, "versions": versions}
)
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Failed to get Civitai user models: %s", exc, exc_info=True) logger.error("Failed to get Civitai user models: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500) return web.json_response({"success": False, "error": str(exc)}, status=500)
@@ -785,9 +976,13 @@ class MetadataArchiveHandler:
def __init__( def __init__(
self, self,
*, *,
metadata_archive_manager_factory: Callable[[], Awaitable[MetadataArchiveManagerProtocol]] = get_metadata_archive_manager, metadata_archive_manager_factory: Callable[
[], Awaitable[MetadataArchiveManagerProtocol]
] = get_metadata_archive_manager,
settings_service=None, settings_service=None,
metadata_provider_updater: Callable[[], Awaitable[None]] = update_metadata_providers, metadata_provider_updater: Callable[
[], Awaitable[None]
] = update_metadata_providers,
) -> None: ) -> None:
self._metadata_archive_manager_factory = metadata_archive_manager_factory self._metadata_archive_manager_factory = metadata_archive_manager_factory
self._settings = settings_service or get_settings_manager() self._settings = settings_service or get_settings_manager()
@@ -799,18 +994,37 @@ class MetadataArchiveHandler:
download_id = request.query.get("download_id") download_id = request.query.get("download_id")
def progress_callback(stage: str, message: str) -> None: def progress_callback(stage: str, message: str) -> None:
data = {"stage": stage, "message": message, "type": "metadata_archive_download"} data = {
"stage": stage,
"message": message,
"type": "metadata_archive_download",
}
if download_id: if download_id:
asyncio.create_task(ws_manager.broadcast_download_progress(download_id, data)) asyncio.create_task(
ws_manager.broadcast_download_progress(download_id, data)
)
else: else:
asyncio.create_task(ws_manager.broadcast(data)) asyncio.create_task(ws_manager.broadcast(data))
success = await archive_manager.download_and_extract_database(progress_callback) success = await archive_manager.download_and_extract_database(
progress_callback
)
if success: if success:
self._settings.set("enable_metadata_archive_db", True) self._settings.set("enable_metadata_archive_db", True)
await self._metadata_provider_updater() await self._metadata_provider_updater()
return web.json_response({"success": True, "message": "Metadata archive database downloaded and extracted successfully"}) return web.json_response(
return web.json_response({"success": False, "error": "Failed to download and extract metadata archive database"}, status=500) {
"success": True,
"message": "Metadata archive database downloaded and extracted successfully",
}
)
return web.json_response(
{
"success": False,
"error": "Failed to download and extract metadata archive database",
},
status=500,
)
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Error downloading metadata archive: %s", exc, exc_info=True) logger.error("Error downloading metadata archive: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500) return web.json_response({"success": False, "error": str(exc)}, status=500)
@@ -822,8 +1036,19 @@ class MetadataArchiveHandler:
if success: if success:
self._settings.set("enable_metadata_archive_db", False) self._settings.set("enable_metadata_archive_db", False)
await self._metadata_provider_updater() await self._metadata_provider_updater()
return web.json_response({"success": True, "message": "Metadata archive database removed successfully"}) return web.json_response(
return web.json_response({"success": False, "error": "Failed to remove metadata archive database"}, status=500) {
"success": True,
"message": "Metadata archive database removed successfully",
}
)
return web.json_response(
{
"success": False,
"error": "Failed to remove metadata archive database",
},
status=500,
)
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Error removing metadata archive: %s", exc, exc_info=True) logger.error("Error removing metadata archive: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500) return web.json_response({"success": False, "error": str(exc)}, status=500)
@@ -844,11 +1069,15 @@ class MetadataArchiveHandler:
"isAvailable": is_available, "isAvailable": is_available,
"isEnabled": is_enabled, "isEnabled": is_enabled,
"databaseSize": db_size, "databaseSize": db_size,
"databasePath": archive_manager.get_database_path() if is_available else None, "databasePath": archive_manager.get_database_path()
if is_available
else None,
} }
) )
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Error getting metadata archive status: %s", exc, exc_info=True) logger.error(
"Error getting metadata archive status: %s", exc, exc_info=True
)
return web.json_response({"success": False, "error": str(exc)}, status=500) return web.json_response({"success": False, "error": str(exc)}, status=500)
@@ -861,21 +1090,55 @@ class FileSystemHandler:
data = await request.json() data = await request.json()
file_path = data.get("file_path") file_path = data.get("file_path")
if not file_path: if not file_path:
return web.json_response({"success": False, "error": "Missing file_path parameter"}, status=400) return web.json_response(
{"success": False, "error": "Missing file_path parameter"},
status=400,
)
file_path = os.path.abspath(file_path) file_path = os.path.abspath(file_path)
if not os.path.isfile(file_path): if not os.path.isfile(file_path):
return web.json_response({"success": False, "error": "File does not exist"}, status=404) return web.json_response(
{"success": False, "error": "File does not exist"}, status=404
)
if os.name == "nt": if os.name == "nt":
subprocess.Popen(["explorer", "/select,", file_path]) subprocess.Popen(["explorer", "/select,", file_path])
elif os.name == "posix": elif os.name == "posix":
if sys.platform == "darwin": if _is_docker():
return web.json_response(
{
"success": True,
"message": "Running in Docker: Path available for copying",
"path": file_path,
"mode": "clipboard",
}
)
elif _is_wsl():
windows_path = _wsl_to_windows_path(file_path)
if windows_path:
subprocess.Popen(["explorer.exe", "/select,", windows_path])
else:
logger.error(
"Failed to convert WSL path to Windows path: %s", file_path
)
return web.json_response(
{
"success": False,
"error": "Failed to open file location: path conversion error",
},
status=500,
)
elif sys.platform == "darwin":
subprocess.Popen(["open", "-R", file_path]) subprocess.Popen(["open", "-R", file_path])
else: else:
folder = os.path.dirname(file_path) folder = os.path.dirname(file_path)
subprocess.Popen(["xdg-open", folder]) subprocess.Popen(["xdg-open", folder])
return web.json_response({"success": True, "message": f"Opened folder and selected file: {file_path}"}) return web.json_response(
{
"success": True,
"message": f"Opened folder and selected file: {file_path}",
}
)
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Failed to open file location: %s", exc, exc_info=True) logger.error("Failed to open file location: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500) return web.json_response({"success": False, "error": str(exc)}, status=500)
@@ -884,22 +1147,54 @@ class FileSystemHandler:
try: try:
settings_file = getattr(self._settings, "settings_file", None) settings_file = getattr(self._settings, "settings_file", None)
if not settings_file: if not settings_file:
return web.json_response({"success": False, "error": "Settings file not found"}, status=404) return web.json_response(
{"success": False, "error": "Settings file not found"}, status=404
)
settings_file = os.path.abspath(settings_file) settings_file = os.path.abspath(settings_file)
if not os.path.isfile(settings_file): if not os.path.isfile(settings_file):
return web.json_response({"success": False, "error": "Settings file does not exist"}, status=404) return web.json_response(
{"success": False, "error": "Settings file does not exist"},
status=404,
)
if os.name == "nt": if os.name == "nt":
subprocess.Popen(["explorer", "/select,", settings_file]) subprocess.Popen(["explorer", "/select,", settings_file])
elif os.name == "posix": elif os.name == "posix":
if sys.platform == "darwin": if _is_docker():
return web.json_response(
{
"success": True,
"message": "Running in Docker: Path available for copying",
"path": settings_file,
"mode": "clipboard",
}
)
elif _is_wsl():
windows_path = _wsl_to_windows_path(settings_file)
if windows_path:
subprocess.Popen(["explorer.exe", "/select,", windows_path])
else:
logger.error(
"Failed to convert WSL path to Windows path: %s",
settings_file,
)
return web.json_response(
{
"success": False,
"error": "Failed to open settings location: path conversion error",
},
status=500,
)
elif sys.platform == "darwin":
subprocess.Popen(["open", "-R", settings_file]) subprocess.Popen(["open", "-R", settings_file])
else: else:
folder = os.path.dirname(settings_file) folder = os.path.dirname(settings_file)
subprocess.Popen(["xdg-open", folder]) subprocess.Popen(["xdg-open", folder])
return web.json_response({"success": True, "message": f"Opened settings folder: {settings_file}"}) return web.json_response(
{"success": True, "message": f"Opened settings folder: {settings_file}"}
)
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Failed to open settings location: %s", exc, exc_info=True) logger.error("Failed to open settings location: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500) return web.json_response({"success": False, "error": str(exc)}, status=500)
@@ -922,21 +1217,44 @@ class NodeRegistryHandler:
data = await request.json() data = await request.json()
nodes = data.get("nodes", []) nodes = data.get("nodes", [])
if not isinstance(nodes, list): if not isinstance(nodes, list):
return web.json_response({"success": False, "error": "nodes must be a list"}, status=400) return web.json_response(
{"success": False, "error": "nodes must be a list"}, status=400
)
for index, node in enumerate(nodes): for index, node in enumerate(nodes):
if not isinstance(node, dict): if not isinstance(node, dict):
return web.json_response({"success": False, "error": f"Node {index} must be an object"}, status=400) return web.json_response(
{"success": False, "error": f"Node {index} must be an object"},
status=400,
)
node_id = node.get("node_id") node_id = node.get("node_id")
if node_id is None: if node_id is None:
return web.json_response({"success": False, "error": f"Node {index} missing node_id parameter"}, status=400) return web.json_response(
{
"success": False,
"error": f"Node {index} missing node_id parameter",
},
status=400,
)
graph_id = node.get("graph_id") graph_id = node.get("graph_id")
if graph_id is None: if graph_id is None:
return web.json_response({"success": False, "error": f"Node {index} missing graph_id parameter"}, status=400) return web.json_response(
{
"success": False,
"error": f"Node {index} missing graph_id parameter",
},
status=400,
)
graph_name = node.get("graph_name") graph_name = node.get("graph_name")
try: try:
node["node_id"] = int(node_id) node["node_id"] = int(node_id)
except (TypeError, ValueError): except (TypeError, ValueError):
return web.json_response({"success": False, "error": f"Node {index} node_id must be an integer"}, status=400) return web.json_response(
{
"success": False,
"error": f"Node {index} node_id must be an integer",
},
status=400,
)
node["graph_id"] = str(graph_id) node["graph_id"] = str(graph_id)
if graph_name is None: if graph_name is None:
node["graph_name"] = None node["graph_name"] = None
@@ -946,7 +1264,12 @@ class NodeRegistryHandler:
node["graph_name"] = str(graph_name) node["graph_name"] = str(graph_name)
await self._node_registry.register_nodes(nodes) await self._node_registry.register_nodes(nodes)
return web.json_response({"success": True, "message": f"{len(nodes)} nodes registered successfully"}) return web.json_response(
{
"success": True,
"message": f"{len(nodes)} nodes registered successfully",
}
)
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Failed to register nodes: %s", exc, exc_info=True) logger.error("Failed to register nodes: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": str(exc)}, status=500) return web.json_response({"success": False, "error": str(exc)}, status=500)
@@ -994,7 +1317,10 @@ class NodeRegistryHandler:
return web.json_response({"success": True, "data": registry_info}) return web.json_response({"success": True, "data": registry_info})
except Exception as exc: # pragma: no cover - defensive logging except Exception as exc: # pragma: no cover - defensive logging
logger.error("Failed to get registry: %s", exc, exc_info=True) logger.error("Failed to get registry: %s", exc, exc_info=True)
return web.json_response({"success": False, "error": "Internal Error", "message": str(exc)}, status=500) return web.json_response(
{"success": False, "error": "Internal Error", "message": str(exc)},
status=500,
)
async def update_node_widget(self, request: web.Request) -> web.Response: async def update_node_widget(self, request: web.Request) -> web.Response:
try: try:
@@ -1004,10 +1330,15 @@ class NodeRegistryHandler:
node_ids = data.get("node_ids") node_ids = data.get("node_ids")
if not isinstance(widget_name, str) or not widget_name: if not isinstance(widget_name, str) or not widget_name:
return web.json_response({"success": False, "error": "Missing widget_name parameter"}, status=400) return web.json_response(
{"success": False, "error": "Missing widget_name parameter"},
status=400,
)
if not isinstance(value, str) or not value: if not isinstance(value, str) or not value:
return web.json_response({"success": False, "error": "Missing value parameter"}, status=400) return web.json_response(
{"success": False, "error": "Missing value parameter"}, status=400
)
if not isinstance(node_ids, list) or not node_ids: if not isinstance(node_ids, list) or not node_ids:
return web.json_response( return web.json_response(
@@ -1107,7 +1438,9 @@ class MiscHandlerSet:
self.metadata_archive = metadata_archive self.metadata_archive = metadata_archive
self.filesystem = filesystem self.filesystem = filesystem
def to_route_mapping(self) -> Mapping[str, Callable[[web.Request], Awaitable[web.StreamResponse]]]: def to_route_mapping(
self,
) -> Mapping[str, Callable[[web.Request], Awaitable[web.StreamResponse]]]:
return { return {
"health_check": self.health.health_check, "health_check": self.health.health_check,
"get_settings": self.settings.get_settings, "get_settings": self.settings.get_settings,

View File

@@ -1001,7 +1001,20 @@ async function openFileLocation(filePath) {
body: JSON.stringify({ 'file_path': filePath }) body: JSON.stringify({ 'file_path': filePath })
}); });
if (!resp.ok) throw new Error('Failed to open file location'); if (!resp.ok) throw new Error('Failed to open file location');
showToast('modals.model.openFileLocation.success', {}, 'success');
const data = await resp.json();
if (data.mode === 'clipboard' && data.path) {
try {
await navigator.clipboard.writeText(data.path);
showToast('modals.model.openFileLocation.copied', { path: data.path }, 'success');
} catch (clipboardErr) {
console.warn('Clipboard API not available:', clipboardErr);
showToast('modals.model.openFileLocation.clipboardFallback', { path: data.path }, 'info');
}
} else {
showToast('modals.model.openFileLocation.success', {}, 'success');
}
} catch (err) { } catch (err) {
showToast('modals.model.openFileLocation.failed', {}, 'error'); showToast('modals.model.openFileLocation.failed', {}, 'error');
} }

View File

@@ -364,7 +364,19 @@ export class SettingsManager {
throw new Error(`Request failed with status ${response.status}`); throw new Error(`Request failed with status ${response.status}`);
} }
showToast('settings.openSettingsFileLocation.success', {}, 'success'); const data = await response.json();
if (data.mode === 'clipboard' && data.path) {
try {
await navigator.clipboard.writeText(data.path);
showToast('settings.openSettingsFileLocation.copied', { path: data.path }, 'success');
} catch (clipboardErr) {
console.warn('Clipboard API not available:', clipboardErr);
showToast('settings.openSettingsFileLocation.clipboardFallback', { path: data.path }, 'info');
}
} else {
showToast('settings.openSettingsFileLocation.success', {}, 'success');
}
} catch (error) { } catch (error) {
console.error('Failed to open settings file location:', error); console.error('Failed to open settings file location:', error);
showToast('settings.openSettingsFileLocation.failed', {}, 'error'); showToast('settings.openSettingsFileLocation.failed', {}, 'error');

View File

@@ -1,5 +1,8 @@
import json import json
import os
import subprocess
from types import SimpleNamespace from types import SimpleNamespace
from unittest.mock import patch, MagicMock
import pytest import pytest
from aiohttp import web from aiohttp import web
@@ -11,6 +14,9 @@ from py.routes.handlers.misc_handlers import (
NodeRegistryHandler, NodeRegistryHandler,
ServiceRegistryAdapter, ServiceRegistryAdapter,
SettingsHandler, SettingsHandler,
_is_wsl,
_wsl_to_windows_path,
_is_docker,
) )
from py.routes.misc_route_registrar import MISC_ROUTE_DEFINITIONS, MiscRouteRegistrar from py.routes.misc_route_registrar import MISC_ROUTE_DEFINITIONS, MiscRouteRegistrar
from py.routes.misc_routes import MiscRoutes from py.routes.misc_routes import MiscRoutes
@@ -114,11 +120,15 @@ def test_misc_route_registrar_registers_all_routes():
async def dummy_handler(_request): async def dummy_handler(_request):
return web.Response() return web.Response()
handler_mapping = {definition.handler_name: dummy_handler for definition in MISC_ROUTE_DEFINITIONS} handler_mapping = {
definition.handler_name: dummy_handler for definition in MISC_ROUTE_DEFINITIONS
}
registrar.register_routes(handler_mapping) registrar.register_routes(handler_mapping)
registered = {(method, path) for method, path, _ in app.router.calls} registered = {(method, path) for method, path, _ in app.router.calls}
expected = {(definition.method, definition.path) for definition in MISC_ROUTE_DEFINITIONS} expected = {
(definition.method, definition.path) for definition in MISC_ROUTE_DEFINITIONS
}
assert registered == expected assert registered == expected
@@ -236,7 +246,10 @@ async def test_register_nodes_includes_capabilities():
"graph_id": "root", "graph_id": "root",
"type": "CheckpointLoaderSimple", "type": "CheckpointLoaderSimple",
"title": "Checkpoint Loader", "title": "Checkpoint Loader",
"capabilities": {"supports_lora": False, "widget_names": ["ckpt_name", "", 42]}, "capabilities": {
"supports_lora": False,
"widget_names": ["ckpt_name", "", 42],
},
} }
] ]
} }
@@ -249,7 +262,10 @@ async def test_register_nodes_includes_capabilities():
registry = await node_registry.get_registry() registry = await node_registry.get_registry()
stored_node = next(iter(registry["nodes"].values())) stored_node = next(iter(registry["nodes"].values()))
assert stored_node["capabilities"] == {"supports_lora": False, "widget_names": ["ckpt_name"]} assert stored_node["capabilities"] == {
"supports_lora": False,
"widget_names": ["ckpt_name"],
}
assert stored_node["widget_names"] == ["ckpt_name"] assert stored_node["widget_names"] == ["ckpt_name"]
@@ -286,7 +302,12 @@ async def test_update_node_widget_sends_payload():
assert send_calls == [ assert send_calls == [
( (
"lm_widget_update", "lm_widget_update",
{"id": 12, "widget_name": "ckpt_name", "value": "models/checkpoints/model.ckpt", "graph_id": "root"}, {
"id": 12,
"widget_name": "ckpt_name",
"value": "models/checkpoints/model.ckpt",
"graph_id": "root",
},
) )
] ]
@@ -428,7 +449,9 @@ async def test_misc_routes_bind_produces_expected_handlers():
controller = MiscRoutes( controller = MiscRoutes(
settings_service=DummySettings(), settings_service=DummySettings(),
usage_stats_factory=lambda: SimpleNamespace(process_execution=noop_async, get_stats=noop_async), usage_stats_factory=lambda: SimpleNamespace(
process_execution=noop_async, get_stats=noop_async
),
prompt_server=FakePromptServer, prompt_server=FakePromptServer,
service_registry_adapter=service_registry_adapter, service_registry_adapter=service_registry_adapter,
metadata_provider_factory=fake_metadata_provider_factory, metadata_provider_factory=fake_metadata_provider_factory,
@@ -545,7 +568,9 @@ async def test_get_civitai_user_models_marks_library_versions():
metadata_provider_factory=provider_factory, metadata_provider_factory=provider_factory,
) )
response = await handler.get_civitai_user_models(FakeRequest(query={"username": "pixel"})) response = await handler.get_civitai_user_models(
FakeRequest(query={"username": "pixel"})
)
payload = json.loads(response.text) payload = json.loads(response.text)
assert payload["success"] is True assert payload["success"] is True
@@ -657,12 +682,19 @@ async def test_get_civitai_user_models_rewrites_civitai_previews():
metadata_provider_factory=provider_factory, metadata_provider_factory=provider_factory,
) )
response = await handler.get_civitai_user_models(FakeRequest(query={"username": "pixel"})) response = await handler.get_civitai_user_models(
FakeRequest(query={"username": "pixel"})
)
payload = json.loads(response.text) payload = json.loads(response.text)
assert payload["success"] is True assert payload["success"] is True
previews_by_version = {item["versionId"]: item["thumbnailUrl"] for item in payload["versions"]} previews_by_version = {
assert previews_by_version[100] == "https://image.civitai.com/container/example/width=450,optimized=true/sample.jpeg" item["versionId"]: item["thumbnailUrl"] for item in payload["versions"]
}
assert (
previews_by_version[100]
== "https://image.civitai.com/container/example/width=450,optimized=true/sample.jpeg"
)
assert ( assert (
previews_by_version[101] previews_by_version[101]
== "https://image.civitai.com/container/example/transcode=true,width=450,optimized=true/sample.mp4" == "https://image.civitai.com/container/example/transcode=true,width=450,optimized=true/sample.mp4"
@@ -706,7 +738,9 @@ def test_ensure_handler_mapping_caches_result():
controller = MiscRoutes( controller = MiscRoutes(
settings_service=DummySettings(), settings_service=DummySettings(),
usage_stats_factory=lambda: SimpleNamespace(process_execution=noop_async, get_stats=noop_async), usage_stats_factory=lambda: SimpleNamespace(
process_execution=noop_async, get_stats=noop_async
),
prompt_server=FakePromptServer, prompt_server=FakePromptServer,
service_registry_adapter=ServiceRegistryAdapter( service_registry_adapter=ServiceRegistryAdapter(
get_lora_scanner=fake_scanner_factory, get_lora_scanner=fake_scanner_factory,
@@ -723,15 +757,17 @@ def test_ensure_handler_mapping_caches_result():
first_mapping = controller._ensure_handler_mapping() first_mapping = controller._ensure_handler_mapping()
second_mapping = controller._ensure_handler_mapping() second_mapping = controller._ensure_handler_mapping()
assert first_mapping is second_mapping, "Expected cached handler mapping to be reused" assert first_mapping is second_mapping, (
"Expected cached handler mapping to be reused"
)
assert len(call_records) == 1, "Handler set factory should only be invoked once" assert len(call_records) == 1, "Handler set factory should only be invoked once"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_check_model_exists_returns_local_versions(): async def test_check_model_exists_returns_local_versions():
versions = [ versions = [
{'versionId': 11, 'name': 'v1', 'fileName': 'model-one'}, {"versionId": 11, "name": "v1", "fileName": "model-one"},
{'versionId': 12, 'name': 'v2', 'fileName': 'model-two'}, {"versionId": 12, "name": "v2", "fileName": "model-two"},
] ]
lora_scanner = RecordingVersionScanner(versions) lora_scanner = RecordingVersionScanner(versions)
@@ -756,12 +792,12 @@ async def test_check_model_exists_returns_local_versions():
metadata_provider_factory=fake_metadata_provider_factory, metadata_provider_factory=fake_metadata_provider_factory,
) )
response = await handler.check_model_exists(FakeRequest(query={'modelId': '5'})) response = await handler.check_model_exists(FakeRequest(query={"modelId": "5"}))
payload = json.loads(response.text) payload = json.loads(response.text)
assert payload['success'] is True assert payload["success"] is True
assert payload['modelType'] == 'lora' assert payload["modelType"] == "lora"
assert payload['versions'] == versions assert payload["versions"] == versions
assert lora_scanner.version_calls == [5] assert lora_scanner.version_calls == [5]
@@ -814,3 +850,119 @@ def test_create_handler_set_uses_provided_dependencies():
assert node_registry_handler._node_registry is fake_node_registry assert node_registry_handler._node_registry is fake_node_registry
assert node_registry_handler._prompt_server is CustomPromptServer assert node_registry_handler._prompt_server is CustomPromptServer
assert node_registry_handler._standalone_mode is True assert node_registry_handler._standalone_mode is True
def test_is_wsl_returns_true_in_wsl_environment():
version_content = "Linux version 6.6.87.2-microsoft-standard-WSL2"
with patch("py.routes.handlers.misc_handlers.open") as mock_open:
mock_file = MagicMock()
mock_file.read.return_value = version_content
mock_open.return_value.__enter__ = MagicMock(return_value=mock_file)
mock_open.return_value.__exit__ = MagicMock(return_value=False)
result = _is_wsl()
assert result is True
def test_is_wsl_returns_false_in_non_wsl_environment():
version_content = "Linux version 6.6.0-25-generic #26-Ubuntu SMP PREEMPT_DYNAMIC"
with patch("py.routes.handlers.misc_handlers.open") as mock_open:
mock_file = MagicMock()
mock_file.read.return_value = version_content
mock_open.return_value.__enter__ = MagicMock(return_value=mock_file)
mock_open.return_value.__exit__ = MagicMock(return_value=False)
result = _is_wsl()
assert result is False
def test_is_wsl_returns_false_on_read_error():
with patch("py.routes.handlers.misc_handlers.open", side_effect=OSError()):
result = _is_wsl()
assert result is False
def test_is_wsl_returns_false_on_read_error():
with patch("builtins.open", side_effect=OSError()):
result = _is_wsl()
assert result is False
def test_wsl_to_windows_path_converts_successfully():
with patch("subprocess.run") as mock_run:
mock_result = MagicMock()
mock_result.stdout = "C:\\Users\\test\\file.txt\n"
mock_run.return_value = mock_result
result = _wsl_to_windows_path("/mnt/c/test")
assert result == "C:\\Users\\test\\file.txt"
mock_run.assert_called_once()
def test_wsl_to_windows_path_returns_none_on_error():
with patch("subprocess.run", side_effect=FileNotFoundError()):
result = _wsl_to_windows_path("/mnt/c/test")
assert result is None
def test_wsl_to_windows_path_returns_none_on_subprocess_error():
with patch(
"subprocess.run", side_effect=subprocess.CalledProcessError(1, "wslpath")
):
result = _wsl_to_windows_path("/mnt/c/test")
assert result is None
def test_is_docker_returns_true_when_dockerenv_exists():
with patch("os.path.exists", return_value=True):
result = _is_docker()
assert result is True
def test_is_docker_checks_cgroup_when_dockerenv_missing():
cgroup_content = "1:name=systemd:/docker/abc123\n"
with patch("os.path.exists", return_value=False):
with patch("py.routes.handlers.misc_handlers.open") as mock_open:
mock_file = MagicMock()
mock_file.read.return_value = cgroup_content
mock_open.return_value.__enter__ = MagicMock(return_value=mock_file)
mock_open.return_value.__exit__ = MagicMock(return_value=False)
result = _is_docker()
assert result is True
def test_is_docker_detects_kubernetes():
cgroup_content = "12:pids:/kubepods/besteffort/pod123/abc123\n"
with patch("os.path.exists", return_value=False):
with patch("py.routes.handlers.misc_handlers.open") as mock_open:
mock_file = MagicMock()
mock_file.read.return_value = cgroup_content
mock_open.return_value.__enter__ = MagicMock(return_value=mock_file)
mock_open.return_value.__exit__ = MagicMock(return_value=False)
result = _is_docker()
assert result is True
def test_is_docker_returns_false_when_no_docker_detected():
cgroup_content = "1:name=systemd:/user.slice/user-1000.slice\n"
with patch("os.path.exists", return_value=False):
with patch("py.routes.handlers.misc_handlers.open") as mock_open:
mock_file = MagicMock()
mock_file.read.return_value = cgroup_content
mock_open.return_value.__enter__ = MagicMock(return_value=mock_file)
mock_open.return_value.__exit__ = MagicMock(return_value=False)
result = _is_docker()
assert result is False
def test_is_docker_returns_false_on_cgroup_read_error():
with patch("os.path.exists", return_value=False):
with patch("py.routes.handlers.misc_handlers.open", side_effect=OSError()):
result = _is_docker()
assert result is False
def test_wsl_to_windows_path_returns_none_on_subprocess_error(tmp_path):
with patch(
"subprocess.run", side_effect=subprocess.CalledProcessError(1, "wslpath")
):
result = _wsl_to_windows_path("/mnt/c/test")
assert result is None