diff --git a/locales/de.json b/locales/de.json index 887b59de..82459ebc 100644 --- a/locales/de.json +++ b/locales/de.json @@ -233,7 +233,9 @@ "label": "Einstellungsordner öffnen", "tooltip": "Den Ordner mit der settings.json öffnen", "success": "Einstellungsordner geöffnet", - "failed": "Einstellungsordner konnte nicht geöffnet werden" + "failed": "Einstellungsordner konnte nicht geöffnet werden", + "copied": "Einstellungspfad in die Zwischenablage kopiert: {{path}}", + "clipboardFallback": "Einstellungspfad: {{path}}" }, "sections": { "contentFiltering": "Inhaltsfilterung", @@ -895,7 +897,9 @@ }, "openFileLocation": { "success": "Dateispeicherort erfolgreich geöffnet", - "failed": "Fehler beim Öffnen des Dateispeicherorts" + "failed": "Fehler beim Öffnen des Dateispeicherorts", + "copied": "Pfad in die Zwischenablage kopiert: {{path}}", + "clipboardFallback": "Pfad: {{path}}" }, "metadata": { "version": "Version", @@ -1531,4 +1535,4 @@ "learnMore": "LM Civitai Extension Tutorial" } } -} \ No newline at end of file +} diff --git a/locales/en.json b/locales/en.json index 7ad7fa4d..ca48784a 100644 --- a/locales/en.json +++ b/locales/en.json @@ -231,9 +231,11 @@ "civitaiApiKeyHelp": "Used for authentication when downloading models from Civitai", "openSettingsFileLocation": { "label": "Open settings folder", - "tooltip": "Open the folder containing settings.json", + "tooltip": "Open folder containing settings.json", "success": "Opened settings.json folder", - "failed": "Failed to open settings.json folder" + "failed": "Failed to open settings.json folder", + "copied": "Settings path copied to clipboard: {{path}}", + "clipboardFallback": "Settings path: {{path}}" }, "sections": { "contentFiltering": "Content Filtering", @@ -895,7 +897,9 @@ }, "openFileLocation": { "success": "File location opened successfully", - "failed": "Failed to open file location" + "failed": "Failed to open file location", + "copied": "Path copied to clipboard: {{path}}", + "clipboardFallback": "Path: {{path}}" }, "metadata": { "version": "Version", diff --git a/locales/es.json b/locales/es.json index 2954f48a..a149ce5a 100644 --- a/locales/es.json +++ b/locales/es.json @@ -233,7 +233,9 @@ "label": "Abrir carpeta de ajustes", "tooltip": "Abrir la carpeta que contiene settings.json", "success": "Carpeta de settings.json abierta", - "failed": "No se pudo abrir la carpeta de settings.json" + "failed": "No se pudo abrir la carpeta de settings.json", + "copied": "Ruta de configuración copiada al portapapeles: {{path}}", + "clipboardFallback": "Ruta de configuración: {{path}}" }, "sections": { "contentFiltering": "Filtrado de contenido", @@ -895,7 +897,9 @@ }, "openFileLocation": { "success": "Ubicación del archivo abierta exitosamente", - "failed": "Error al abrir la ubicación del archivo" + "failed": "Error al abrir la ubicación del archivo", + "copied": "Ruta copiada al portapapeles: {{path}}", + "clipboardFallback": "Ruta: {{path}}" }, "metadata": { "version": "Versión", @@ -1531,4 +1535,4 @@ "learnMore": "LM Civitai Extension Tutorial" } } -} \ No newline at end of file +} diff --git a/locales/fr.json b/locales/fr.json index b9e8aec6..8608edfa 100644 --- a/locales/fr.json +++ b/locales/fr.json @@ -233,7 +233,9 @@ "label": "Ouvrir le dossier des paramètres", "tooltip": "Ouvrir le dossier contenant settings.json", "success": "Dossier settings.json ouvert", - "failed": "Impossible d'ouvrir le dossier settings.json" + "failed": "Impossible d'ouvrir le dossier settings.json", + "copied": "Chemin des paramètres copié dans le presse-papiers: {{path}}", + "clipboardFallback": "Chemin des paramètres: {{path}}" }, "sections": { "contentFiltering": "Filtrage du contenu", @@ -895,7 +897,9 @@ }, "openFileLocation": { "success": "Emplacement du fichier ouvert avec succès", - "failed": "Échec de l'ouverture de l'emplacement du fichier" + "failed": "Échec de l'ouverture de l'emplacement du fichier", + "copied": "Chemin copié dans le presse-papiers: {{path}}", + "clipboardFallback": "Chemin: {{path}}" }, "metadata": { "version": "Version", @@ -1531,4 +1535,4 @@ "learnMore": "LM Civitai Extension Tutorial" } } -} \ No newline at end of file +} diff --git a/locales/he.json b/locales/he.json index be7744ea..2c7d691e 100644 --- a/locales/he.json +++ b/locales/he.json @@ -233,7 +233,9 @@ "label": "פתח תיקיית הגדרות", "tooltip": "פתח את התיקייה שמכילה את ‎settings.json", "success": "תיקיית settings.json נפתחה", - "failed": "לא ניתן לפתוח את תיקיית settings.json" + "failed": "לא ניתן לפתוח את תיקיית settings.json", + "copied": "נתיב ההגדרות הועתק ללוח העריכה: {{path}}", + "clipboardFallback": "נתיב ההגדרות: {{path}}" }, "sections": { "contentFiltering": "סינון תוכן", @@ -895,7 +897,9 @@ }, "openFileLocation": { "success": "מיקום הקובץ נפתח בהצלחה", - "failed": "פתיחת מיקום הקובץ נכשלה" + "failed": "פתיחת מיקום הקובץ נכשלה", + "copied": "הנתיב הועתק ללוח העריכה: {{path}}", + "clipboardFallback": "נתיב: {{path}}" }, "metadata": { "version": "גרסה", @@ -1531,4 +1535,4 @@ "learnMore": "LM Civitai Extension Tutorial" } } -} \ No newline at end of file +} diff --git a/locales/ja.json b/locales/ja.json index 570b1cd4..49e58ffc 100644 --- a/locales/ja.json +++ b/locales/ja.json @@ -233,7 +233,9 @@ "label": "設定フォルダーを開く", "tooltip": "settings.json を含むフォルダーを開きます", "success": "settings.json フォルダーを開きました", - "failed": "settings.json フォルダーを開けませんでした" + "failed": "settings.json フォルダーを開けませんでした", + "copied": "設定パスをクリップボードにコピーしました: {{path}}", + "clipboardFallback": "設定パス: {{path}}" }, "sections": { "contentFiltering": "コンテンツフィルタリング", @@ -895,7 +897,9 @@ }, "openFileLocation": { "success": "ファイルの場所を正常に開きました", - "failed": "ファイルの場所を開くのに失敗しました" + "failed": "ファイルの場所を開くのに失敗しました", + "copied": "パスをクリップボードにコピーしました: {{path}}", + "clipboardFallback": "パス: {{path}}" }, "metadata": { "version": "バージョン", @@ -1531,4 +1535,4 @@ "learnMore": "LM Civitai Extension Tutorial" } } -} \ No newline at end of file +} diff --git a/locales/ko.json b/locales/ko.json index 4551224c..831c76c0 100644 --- a/locales/ko.json +++ b/locales/ko.json @@ -233,7 +233,9 @@ "label": "설정 폴더 열기", "tooltip": "settings.json이 있는 폴더를 엽니다", "success": "settings.json 폴더를 열었습니다", - "failed": "settings.json 폴더를 열지 못했습니다" + "failed": "settings.json 폴더를 열지 못했습니다", + "copied": "설정 경로가 클립보드에 복사되었습니다: {{path}}", + "clipboardFallback": "설정 경로: {{path}}" }, "sections": { "contentFiltering": "콘텐츠 필터링", @@ -895,7 +897,9 @@ }, "openFileLocation": { "success": "파일 위치가 성공적으로 열렸습니다", - "failed": "파일 위치 열기에 실패했습니다" + "failed": "파일 위치 열기에 실패했습니다", + "copied": "경로가 클립보드에 복사되었습니다: {{path}}", + "clipboardFallback": "경로: {{path}}" }, "metadata": { "version": "버전", @@ -1531,4 +1535,4 @@ "learnMore": "LM Civitai Extension Tutorial" } } -} \ No newline at end of file +} diff --git a/locales/ru.json b/locales/ru.json index 5175625b..20d02322 100644 --- a/locales/ru.json +++ b/locales/ru.json @@ -233,7 +233,9 @@ "label": "Открыть папку настроек", "tooltip": "Открыть папку, содержащую settings.json", "success": "Папка settings.json открыта", - "failed": "Не удалось открыть папку settings.json" + "failed": "Не удалось открыть папку settings.json", + "copied": "Путь настроек скопирован в буфер обмена: {{path}}", + "clipboardFallback": "Путь настроек: {{path}}" }, "sections": { "contentFiltering": "Фильтрация контента", @@ -895,7 +897,9 @@ }, "openFileLocation": { "success": "Расположение файла успешно открыто", - "failed": "Не удалось открыть расположение файла" + "failed": "Не удалось открыть расположение файла", + "copied": "Путь скопирован в буфер обмена: {{path}}", + "clipboardFallback": "Путь: {{path}}" }, "metadata": { "version": "Версия", @@ -1531,4 +1535,4 @@ "learnMore": "LM Civitai Extension Tutorial" } } -} \ No newline at end of file +} diff --git a/locales/zh-CN.json b/locales/zh-CN.json index 2ec694cf..cc4be004 100644 --- a/locales/zh-CN.json +++ b/locales/zh-CN.json @@ -233,7 +233,9 @@ "label": "打开设置文件夹", "tooltip": "打开包含 settings.json 的文件夹", "success": "已打开 settings.json 文件夹", - "failed": "无法打开 settings.json 文件夹" + "failed": "无法打开 settings.json 文件夹", + "copied": "设置路径已复制到剪贴板:{{path}}", + "clipboardFallback": "设置路径:{{path}}" }, "sections": { "contentFiltering": "内容过滤", @@ -895,7 +897,9 @@ }, "openFileLocation": { "success": "文件位置已成功打开", - "failed": "打开文件位置失败" + "failed": "打开文件位置失败", + "copied": "路径已复制到剪贴板:{{path}}", + "clipboardFallback": "路径:{{path}}" }, "metadata": { "version": "版本", diff --git a/locales/zh-TW.json b/locales/zh-TW.json index 664b2f42..88ca11ae 100644 --- a/locales/zh-TW.json +++ b/locales/zh-TW.json @@ -233,7 +233,9 @@ "label": "開啟設定資料夾", "tooltip": "開啟包含 settings.json 的資料夾", "success": "已開啟 settings.json 資料夾", - "failed": "無法開啟 settings.json 資料夾" + "failed": "無法開啟 settings.json 資料夾", + "copied": "設定路徑已複製到剪貼簿:{{path}}", + "clipboardFallback": "設定路徑:{{path}}" }, "sections": { "contentFiltering": "內容過濾", @@ -895,7 +897,9 @@ }, "openFileLocation": { "success": "檔案位置已成功開啟", - "failed": "開啟檔案位置失敗" + "failed": "開啟檔案位置失敗", + "copied": "路徑已複製到剪貼簿:{{path}}", + "clipboardFallback": "路徑:{{path}}" }, "metadata": { "version": "版本", @@ -1531,4 +1535,4 @@ "learnMore": "LM Civitai Extension Tutorial" } } -} \ No newline at end of file +} diff --git a/py/routes/handlers/misc_handlers.py b/py/routes/handlers/misc_handlers.py index 4dc7d300..f0027c73 100644 --- a/py/routes/handlers/misc_handlers.py +++ b/py/routes/handlers/misc_handlers.py @@ -43,12 +43,55 @@ from ...utils.usage_stats import UsageStats logger = logging.getLogger(__name__) +def _is_wsl() -> bool: + """Check if running in WSL environment.""" + try: + with open("/proc/version", "r") as f: + version_info = f.read().lower() + return "microsoft" in version_info or "wsl" in version_info + except (OSError, IOError): + return False + + +def _is_docker() -> bool: + """Check if running in Docker container.""" + dockerenv_exists = os.path.exists("/.dockerenv") + if dockerenv_exists: + return True + + try: + with open("/proc/1/cgroup", "r") as f: + cgroup_content = f.read() + return ( + "docker" in cgroup_content.lower() + or "kubepods" in cgroup_content.lower() + ) + except (OSError, IOError): + return False + + +def _wsl_to_windows_path(wsl_path: str) -> str | None: + """Convert WSL path to Windows path using wslpath.""" + try: + result = subprocess.run( + ["wslpath", "-w", wsl_path], + capture_output=True, + text=True, + check=True, + ) + return result.stdout.strip() + except (subprocess.CalledProcessError, FileNotFoundError, OSError): + return None + + class PromptServerProtocol(Protocol): """Subset of PromptServer used by the handlers.""" instance: "PromptServerProtocol" - def send_sync(self, event: str, payload: dict) -> None: # pragma: no cover - protocol + def send_sync( + self, event: str, payload: dict + ) -> None: # pragma: no cover - protocol ... @@ -63,7 +106,9 @@ class UsageStatsFactory(Protocol): class MetadataProviderProtocol(Protocol): - async def get_model_versions(self, model_id: int) -> dict | None: # pragma: no cover - protocol + async def get_model_versions( + self, model_id: int + ) -> dict | None: # pragma: no cover - protocol ... @@ -109,7 +154,11 @@ class NodeRegistry: raw_widget_names: list | None = node.get("widget_names") if not isinstance(raw_widget_names, list): capability_widget_names = capabilities.get("widget_names") - raw_widget_names = capability_widget_names if isinstance(capability_widget_names, list) else None + raw_widget_names = ( + capability_widget_names + if isinstance(capability_widget_names, list) + else None + ) widget_names: list[str] = [] if isinstance(raw_widget_names, list): @@ -205,14 +254,25 @@ class SettingsHandler: "auto_organize_exclusions", ) - _PROXY_KEYS = {"proxy_enabled", "proxy_host", "proxy_port", "proxy_username", "proxy_password", "proxy_type"} + _PROXY_KEYS = { + "proxy_enabled", + "proxy_host", + "proxy_port", + "proxy_username", + "proxy_password", + "proxy_type", + } def __init__( self, *, settings_service=None, - metadata_provider_updater: Callable[[], Awaitable[None]] = update_metadata_providers, - downloader_factory: Callable[[], Awaitable[DownloaderProtocol]] = get_downloader, + metadata_provider_updater: Callable[ + [], Awaitable[None] + ] = update_metadata_providers, + downloader_factory: Callable[ + [], Awaitable[DownloaderProtocol] + ] = get_downloader, ) -> None: self._settings = settings_service or get_settings_manager() self._metadata_provider_updater = metadata_provider_updater @@ -248,11 +308,13 @@ class SettingsHandler: response_data["settings_file"] = settings_file messages_getter = getattr(self._settings, "get_startup_messages", None) messages = list(messages_getter()) if callable(messages_getter) else [] - return web.json_response({ - "success": True, - "settings": response_data, - "messages": messages, - }) + return web.json_response( + { + "success": True, + "settings": response_data, + "messages": messages, + } + ) except Exception as exc: # pragma: no cover - defensive logging logger.error("Error getting settings: %s", exc, exc_info=True) return web.json_response({"success": False, "error": str(exc)}, status=500) @@ -271,8 +333,12 @@ class SettingsHandler: try: data = await request.json() except Exception as exc: # pragma: no cover - defensive logging - logger.error("Error parsing activate library request: %s", exc, exc_info=True) - return web.json_response({"success": False, "error": "Invalid JSON payload"}, status=400) + logger.error( + "Error parsing activate library request: %s", exc, exc_info=True + ) + return web.json_response( + {"success": False, "error": "Invalid JSON payload"}, status=400 + ) library_name = data.get("library") or data.get("library_name") if not isinstance(library_name, str) or not library_name.strip(): @@ -297,7 +363,9 @@ class SettingsHandler: logger.debug("Attempted to activate unknown library '%s'", library_name) return web.json_response({"success": False, "error": str(exc)}, status=404) except Exception as exc: # pragma: no cover - defensive logging - logger.error("Error activating library '%s': %s", library_name, exc, exc_info=True) + logger.error( + "Error activating library '%s': %s", library_name, exc, exc_info=True + ) return web.json_response({"success": False, "error": str(exc)}, status=500) async def update_settings(self, request: web.Request) -> web.Response: @@ -312,9 +380,14 @@ class SettingsHandler: if key == "example_images_path" and value: validation_error = self._validate_example_images_path(value) if validation_error: - return web.json_response({"success": False, "error": validation_error}) + return web.json_response( + {"success": False, "error": validation_error} + ) - if value == "__DELETE__" and key in ("proxy_username", "proxy_password"): + if value == "__DELETE__" and key in ( + "proxy_username", + "proxy_password", + ): self._settings.delete(key) else: self._settings.set(key, value) @@ -356,7 +429,9 @@ class UsageStatsHandler: data = await request.json() prompt_id = data.get("prompt_id") if not prompt_id: - return web.json_response({"success": False, "error": "Missing prompt_id"}, status=400) + return web.json_response( + {"success": False, "error": "Missing prompt_id"}, status=400 + ) usage_stats = self._usage_stats_factory() await usage_stats.process_execution(prompt_id) return web.json_response({"success": True}) @@ -387,18 +462,24 @@ class LoraCodeHandler: mode = data.get("mode", "append") if not lora_code: - return web.json_response({"success": False, "error": "Missing lora_code parameter"}, status=400) + return web.json_response( + {"success": False, "error": "Missing lora_code parameter"}, + status=400, + ) results = [] if node_ids is None: try: self._prompt_server.instance.send_sync( - "lora_code_update", {"id": -1, "lora_code": lora_code, "mode": mode} + "lora_code_update", + {"id": -1, "lora_code": lora_code, "mode": mode}, ) results.append({"node_id": "broadcast", "success": True}) except Exception as exc: # pragma: no cover - defensive logging logger.error("Error broadcasting lora code: %s", exc) - results.append({"node_id": "broadcast", "success": False, "error": str(exc)}) + results.append( + {"node_id": "broadcast", "success": False, "error": str(exc)} + ) else: for entry in node_ids: node_identifier = entry @@ -471,11 +552,19 @@ class TrainedWordsHandler: try: file_path = request.query.get("file_path") if not file_path: - return web.json_response({"success": False, "error": "Missing file_path parameter"}, status=400) + return web.json_response( + {"success": False, "error": "Missing file_path parameter"}, + status=400, + ) if not os.path.exists(file_path): - return web.json_response({"success": False, "error": "File not found"}, status=404) + return web.json_response( + {"success": False, "error": "File not found"}, status=404 + ) if not file_path.endswith(".safetensors"): - return web.json_response({"success": False, "error": "File must be a safetensors file"}, status=400) + return web.json_response( + {"success": False, "error": "File must be a safetensors file"}, + status=400, + ) trained_words, class_tokens = await extract_trained_words(file_path) return web.json_response( @@ -495,10 +584,15 @@ class ModelExampleFilesHandler: try: model_path = request.query.get("model_path") if not model_path: - return web.json_response({"success": False, "error": "Missing model_path parameter"}, status=400) + return web.json_response( + {"success": False, "error": "Missing model_path parameter"}, + status=400, + ) model_dir = os.path.dirname(model_path) if not os.path.exists(model_dir): - return web.json_response({"success": False, "error": "Model directory not found"}, status=404) + return web.json_response( + {"success": False, "error": "Model directory not found"}, status=404 + ) base_name = os.path.splitext(os.path.basename(model_path))[0] files = [] @@ -510,7 +604,10 @@ class ModelExampleFilesHandler: if not os.path.isfile(file_full_path): continue file_ext = os.path.splitext(file)[1].lower() - if file_ext not in SUPPORTED_MEDIA_EXTENSIONS["images"] and file_ext not in SUPPORTED_MEDIA_EXTENSIONS["videos"]: + if ( + file_ext not in SUPPORTED_MEDIA_EXTENSIONS["images"] + and file_ext not in SUPPORTED_MEDIA_EXTENSIONS["videos"] + ): continue try: index = int(file[len(pattern) :].split(".")[0]) @@ -545,7 +642,13 @@ class ServiceRegistryAdapter: class ModelLibraryHandler: - def __init__(self, service_registry: ServiceRegistryAdapter, metadata_provider_factory: Callable[[], Awaitable[MetadataProviderProtocol | None]]) -> None: + def __init__( + self, + service_registry: ServiceRegistryAdapter, + metadata_provider_factory: Callable[ + [], Awaitable[MetadataProviderProtocol | None] + ], + ) -> None: self._service_registry = service_registry self._metadata_provider_factory = metadata_provider_factory @@ -554,11 +657,17 @@ class ModelLibraryHandler: model_id_str = request.query.get("modelId") model_version_id_str = request.query.get("modelVersionId") if not model_id_str: - return web.json_response({"success": False, "error": "Missing required parameter: modelId"}, status=400) + return web.json_response( + {"success": False, "error": "Missing required parameter: modelId"}, + status=400, + ) try: model_id = int(model_id_str) except ValueError: - return web.json_response({"success": False, "error": "Parameter modelId must be an integer"}, status=400) + return web.json_response( + {"success": False, "error": "Parameter modelId must be an integer"}, + status=400, + ) lora_scanner = await self._service_registry.get_lora_scanner() checkpoint_scanner = await self._service_registry.get_checkpoint_scanner() @@ -568,29 +677,55 @@ class ModelLibraryHandler: try: model_version_id = int(model_version_id_str) except ValueError: - return web.json_response({"success": False, "error": "Parameter modelVersionId must be an integer"}, status=400) + return web.json_response( + { + "success": False, + "error": "Parameter modelVersionId must be an integer", + }, + status=400, + ) exists = False model_type = None if await lora_scanner.check_model_version_exists(model_version_id): exists = True model_type = "lora" - elif checkpoint_scanner and await checkpoint_scanner.check_model_version_exists(model_version_id): + elif ( + checkpoint_scanner + and await checkpoint_scanner.check_model_version_exists( + model_version_id + ) + ): exists = True model_type = "checkpoint" - elif embedding_scanner and await embedding_scanner.check_model_version_exists(model_version_id): + elif ( + embedding_scanner + and await embedding_scanner.check_model_version_exists( + model_version_id + ) + ): exists = True model_type = "embedding" - return web.json_response({"success": True, "exists": exists, "modelType": model_type if exists else None}) + return web.json_response( + { + "success": True, + "exists": exists, + "modelType": model_type if exists else None, + } + ) lora_versions = await lora_scanner.get_model_versions_by_id(model_id) checkpoint_versions = [] embedding_versions = [] if not lora_versions and checkpoint_scanner: - checkpoint_versions = await checkpoint_scanner.get_model_versions_by_id(model_id) + checkpoint_versions = await checkpoint_scanner.get_model_versions_by_id( + model_id + ) if not lora_versions and not checkpoint_versions and embedding_scanner: - embedding_versions = await embedding_scanner.get_model_versions_by_id(model_id) + embedding_versions = await embedding_scanner.get_model_versions_by_id( + model_id + ) model_type = None versions = [] @@ -604,7 +739,9 @@ class ModelLibraryHandler: model_type = "embedding" versions = embedding_versions - return web.json_response({"success": True, "modelType": model_type, "versions": versions}) + return web.json_response( + {"success": True, "modelType": model_type, "versions": versions} + ) except Exception as exc: # pragma: no cover - defensive logging logger.error("Failed to check model existence: %s", exc, exc_info=True) return web.json_response({"success": False, "error": str(exc)}, status=500) @@ -613,22 +750,35 @@ class ModelLibraryHandler: try: model_id_str = request.query.get("modelId") if not model_id_str: - return web.json_response({"success": False, "error": "Missing required parameter: modelId"}, status=400) + return web.json_response( + {"success": False, "error": "Missing required parameter: modelId"}, + status=400, + ) try: model_id = int(model_id_str) except ValueError: - return web.json_response({"success": False, "error": "Parameter modelId must be an integer"}, status=400) + return web.json_response( + {"success": False, "error": "Parameter modelId must be an integer"}, + status=400, + ) metadata_provider = await self._metadata_provider_factory() if not metadata_provider: - return web.json_response({"success": False, "error": "Metadata provider not available"}, status=503) + return web.json_response( + {"success": False, "error": "Metadata provider not available"}, + status=503, + ) try: response = await metadata_provider.get_model_versions(model_id) except ResourceNotFoundError: - return web.json_response({"success": False, "error": "Model not found"}, status=404) + return web.json_response( + {"success": False, "error": "Model not found"}, status=404 + ) if not response or not response.get("modelVersions"): - return web.json_response({"success": False, "error": "Model not found"}, status=404) + return web.json_response( + {"success": False, "error": "Model not found"}, status=404 + ) versions = response.get("modelVersions", []) model_name = response.get("name", "") @@ -646,10 +796,22 @@ class ModelLibraryHandler: scanner = await self._service_registry.get_embedding_scanner() normalized_type = "embedding" else: - return web.json_response({"success": False, "error": f'Model type "{model_type}" is not supported'}, status=400) + return web.json_response( + { + "success": False, + "error": f'Model type "{model_type}" is not supported', + }, + status=400, + ) if not scanner: - return web.json_response({"success": False, "error": f'Scanner for type "{normalized_type}" is not available'}, status=503) + return web.json_response( + { + "success": False, + "error": f'Scanner for type "{normalized_type}" is not available', + }, + status=503, + ) local_versions = await scanner.get_model_versions_by_id(model_id) local_version_ids = {version["versionId"] for version in local_versions} @@ -661,7 +823,9 @@ class ModelLibraryHandler: { "id": version_id, "name": version.get("name", ""), - "thumbnailUrl": version.get("images")[0]["url"] if version.get("images") else None, + "thumbnailUrl": version.get("images")[0]["url"] + if version.get("images") + else None, "inLibrary": version_id in local_version_ids, } ) @@ -683,19 +847,34 @@ class ModelLibraryHandler: try: username = request.query.get("username") if not username: - return web.json_response({"success": False, "error": "Missing required parameter: username"}, status=400) + return web.json_response( + {"success": False, "error": "Missing required parameter: username"}, + status=400, + ) metadata_provider = await self._metadata_provider_factory() if not metadata_provider: - return web.json_response({"success": False, "error": "Metadata provider not available"}, status=503) + return web.json_response( + {"success": False, "error": "Metadata provider not available"}, + status=503, + ) try: models = await metadata_provider.get_user_models(username) except NotImplementedError: - return web.json_response({"success": False, "error": "Metadata provider does not support user model queries"}, status=501) + return web.json_response( + { + "success": False, + "error": "Metadata provider does not support user model queries", + }, + status=501, + ) if models is None: - return web.json_response({"success": False, "error": "Failed to fetch user models"}, status=502) + return web.json_response( + {"success": False, "error": "Failed to fetch user models"}, + status=502, + ) if not isinstance(models, list): models = [] @@ -704,7 +883,9 @@ class ModelLibraryHandler: checkpoint_scanner = await self._service_registry.get_checkpoint_scanner() embedding_scanner = await self._service_registry.get_embedding_scanner() - normalized_allowed_types = {model_type.lower() for model_type in CIVITAI_USER_MODEL_TYPES} + normalized_allowed_types = { + model_type.lower() for model_type in CIVITAI_USER_MODEL_TYPES + } lora_type_aliases = {model_type.lower() for model_type in VALID_LORA_TYPES} type_scanner_map: Dict[str, object | None] = { @@ -724,7 +905,13 @@ class ModelLibraryHandler: scanner = type_scanner_map.get(model_type) if scanner is None: - return web.json_response({"success": False, "error": f'Scanner for type "{model_type}" is not available'}, status=503) + return web.json_response( + { + "success": False, + "error": f'Scanner for type "{model_type}" is not available', + }, + status=503, + ) tags_value = model.get("tags") tags = tags_value if isinstance(tags_value, list) else [] @@ -759,7 +946,9 @@ class ModelLibraryHandler: rewritten_url, _ = rewrite_preview_url(raw_url, media_type) thumbnail_url = rewritten_url - in_library = await scanner.check_model_version_exists(version_id_int) + in_library = await scanner.check_model_version_exists( + version_id_int + ) versions.append( { @@ -775,7 +964,9 @@ class ModelLibraryHandler: } ) - return web.json_response({"success": True, "username": username, "versions": versions}) + return web.json_response( + {"success": True, "username": username, "versions": versions} + ) except Exception as exc: # pragma: no cover - defensive logging logger.error("Failed to get Civitai user models: %s", exc, exc_info=True) return web.json_response({"success": False, "error": str(exc)}, status=500) @@ -785,9 +976,13 @@ class MetadataArchiveHandler: def __init__( self, *, - metadata_archive_manager_factory: Callable[[], Awaitable[MetadataArchiveManagerProtocol]] = get_metadata_archive_manager, + metadata_archive_manager_factory: Callable[ + [], Awaitable[MetadataArchiveManagerProtocol] + ] = get_metadata_archive_manager, settings_service=None, - metadata_provider_updater: Callable[[], Awaitable[None]] = update_metadata_providers, + metadata_provider_updater: Callable[ + [], Awaitable[None] + ] = update_metadata_providers, ) -> None: self._metadata_archive_manager_factory = metadata_archive_manager_factory self._settings = settings_service or get_settings_manager() @@ -799,18 +994,37 @@ class MetadataArchiveHandler: download_id = request.query.get("download_id") def progress_callback(stage: str, message: str) -> None: - data = {"stage": stage, "message": message, "type": "metadata_archive_download"} + data = { + "stage": stage, + "message": message, + "type": "metadata_archive_download", + } if download_id: - asyncio.create_task(ws_manager.broadcast_download_progress(download_id, data)) + asyncio.create_task( + ws_manager.broadcast_download_progress(download_id, data) + ) else: asyncio.create_task(ws_manager.broadcast(data)) - success = await archive_manager.download_and_extract_database(progress_callback) + success = await archive_manager.download_and_extract_database( + progress_callback + ) if success: self._settings.set("enable_metadata_archive_db", True) await self._metadata_provider_updater() - return web.json_response({"success": True, "message": "Metadata archive database downloaded and extracted successfully"}) - return web.json_response({"success": False, "error": "Failed to download and extract metadata archive database"}, status=500) + return web.json_response( + { + "success": True, + "message": "Metadata archive database downloaded and extracted successfully", + } + ) + return web.json_response( + { + "success": False, + "error": "Failed to download and extract metadata archive database", + }, + status=500, + ) except Exception as exc: # pragma: no cover - defensive logging logger.error("Error downloading metadata archive: %s", exc, exc_info=True) return web.json_response({"success": False, "error": str(exc)}, status=500) @@ -822,8 +1036,19 @@ class MetadataArchiveHandler: if success: self._settings.set("enable_metadata_archive_db", False) await self._metadata_provider_updater() - return web.json_response({"success": True, "message": "Metadata archive database removed successfully"}) - return web.json_response({"success": False, "error": "Failed to remove metadata archive database"}, status=500) + return web.json_response( + { + "success": True, + "message": "Metadata archive database removed successfully", + } + ) + return web.json_response( + { + "success": False, + "error": "Failed to remove metadata archive database", + }, + status=500, + ) except Exception as exc: # pragma: no cover - defensive logging logger.error("Error removing metadata archive: %s", exc, exc_info=True) return web.json_response({"success": False, "error": str(exc)}, status=500) @@ -844,11 +1069,15 @@ class MetadataArchiveHandler: "isAvailable": is_available, "isEnabled": is_enabled, "databaseSize": db_size, - "databasePath": archive_manager.get_database_path() if is_available else None, + "databasePath": archive_manager.get_database_path() + if is_available + else None, } ) except Exception as exc: # pragma: no cover - defensive logging - logger.error("Error getting metadata archive status: %s", exc, exc_info=True) + logger.error( + "Error getting metadata archive status: %s", exc, exc_info=True + ) return web.json_response({"success": False, "error": str(exc)}, status=500) @@ -861,21 +1090,55 @@ class FileSystemHandler: data = await request.json() file_path = data.get("file_path") if not file_path: - return web.json_response({"success": False, "error": "Missing file_path parameter"}, status=400) + return web.json_response( + {"success": False, "error": "Missing file_path parameter"}, + status=400, + ) file_path = os.path.abspath(file_path) if not os.path.isfile(file_path): - return web.json_response({"success": False, "error": "File does not exist"}, status=404) + return web.json_response( + {"success": False, "error": "File does not exist"}, status=404 + ) if os.name == "nt": subprocess.Popen(["explorer", "/select,", file_path]) elif os.name == "posix": - if sys.platform == "darwin": + if _is_docker(): + return web.json_response( + { + "success": True, + "message": "Running in Docker: Path available for copying", + "path": file_path, + "mode": "clipboard", + } + ) + elif _is_wsl(): + windows_path = _wsl_to_windows_path(file_path) + if windows_path: + subprocess.Popen(["explorer.exe", "/select,", windows_path]) + else: + logger.error( + "Failed to convert WSL path to Windows path: %s", file_path + ) + return web.json_response( + { + "success": False, + "error": "Failed to open file location: path conversion error", + }, + status=500, + ) + elif sys.platform == "darwin": subprocess.Popen(["open", "-R", file_path]) else: folder = os.path.dirname(file_path) subprocess.Popen(["xdg-open", folder]) - return web.json_response({"success": True, "message": f"Opened folder and selected file: {file_path}"}) + return web.json_response( + { + "success": True, + "message": f"Opened folder and selected file: {file_path}", + } + ) except Exception as exc: # pragma: no cover - defensive logging logger.error("Failed to open file location: %s", exc, exc_info=True) return web.json_response({"success": False, "error": str(exc)}, status=500) @@ -884,22 +1147,54 @@ class FileSystemHandler: try: settings_file = getattr(self._settings, "settings_file", None) if not settings_file: - return web.json_response({"success": False, "error": "Settings file not found"}, status=404) + return web.json_response( + {"success": False, "error": "Settings file not found"}, status=404 + ) settings_file = os.path.abspath(settings_file) if not os.path.isfile(settings_file): - return web.json_response({"success": False, "error": "Settings file does not exist"}, status=404) + return web.json_response( + {"success": False, "error": "Settings file does not exist"}, + status=404, + ) if os.name == "nt": subprocess.Popen(["explorer", "/select,", settings_file]) elif os.name == "posix": - if sys.platform == "darwin": + if _is_docker(): + return web.json_response( + { + "success": True, + "message": "Running in Docker: Path available for copying", + "path": settings_file, + "mode": "clipboard", + } + ) + elif _is_wsl(): + windows_path = _wsl_to_windows_path(settings_file) + if windows_path: + subprocess.Popen(["explorer.exe", "/select,", windows_path]) + else: + logger.error( + "Failed to convert WSL path to Windows path: %s", + settings_file, + ) + return web.json_response( + { + "success": False, + "error": "Failed to open settings location: path conversion error", + }, + status=500, + ) + elif sys.platform == "darwin": subprocess.Popen(["open", "-R", settings_file]) else: folder = os.path.dirname(settings_file) subprocess.Popen(["xdg-open", folder]) - return web.json_response({"success": True, "message": f"Opened settings folder: {settings_file}"}) + return web.json_response( + {"success": True, "message": f"Opened settings folder: {settings_file}"} + ) except Exception as exc: # pragma: no cover - defensive logging logger.error("Failed to open settings location: %s", exc, exc_info=True) return web.json_response({"success": False, "error": str(exc)}, status=500) @@ -922,21 +1217,44 @@ class NodeRegistryHandler: data = await request.json() nodes = data.get("nodes", []) if not isinstance(nodes, list): - return web.json_response({"success": False, "error": "nodes must be a list"}, status=400) + return web.json_response( + {"success": False, "error": "nodes must be a list"}, status=400 + ) for index, node in enumerate(nodes): if not isinstance(node, dict): - return web.json_response({"success": False, "error": f"Node {index} must be an object"}, status=400) + return web.json_response( + {"success": False, "error": f"Node {index} must be an object"}, + status=400, + ) node_id = node.get("node_id") if node_id is None: - return web.json_response({"success": False, "error": f"Node {index} missing node_id parameter"}, status=400) + return web.json_response( + { + "success": False, + "error": f"Node {index} missing node_id parameter", + }, + status=400, + ) graph_id = node.get("graph_id") if graph_id is None: - return web.json_response({"success": False, "error": f"Node {index} missing graph_id parameter"}, status=400) + return web.json_response( + { + "success": False, + "error": f"Node {index} missing graph_id parameter", + }, + status=400, + ) graph_name = node.get("graph_name") try: node["node_id"] = int(node_id) except (TypeError, ValueError): - return web.json_response({"success": False, "error": f"Node {index} node_id must be an integer"}, status=400) + return web.json_response( + { + "success": False, + "error": f"Node {index} node_id must be an integer", + }, + status=400, + ) node["graph_id"] = str(graph_id) if graph_name is None: node["graph_name"] = None @@ -946,7 +1264,12 @@ class NodeRegistryHandler: node["graph_name"] = str(graph_name) await self._node_registry.register_nodes(nodes) - return web.json_response({"success": True, "message": f"{len(nodes)} nodes registered successfully"}) + return web.json_response( + { + "success": True, + "message": f"{len(nodes)} nodes registered successfully", + } + ) except Exception as exc: # pragma: no cover - defensive logging logger.error("Failed to register nodes: %s", exc, exc_info=True) return web.json_response({"success": False, "error": str(exc)}, status=500) @@ -994,7 +1317,10 @@ class NodeRegistryHandler: return web.json_response({"success": True, "data": registry_info}) except Exception as exc: # pragma: no cover - defensive logging logger.error("Failed to get registry: %s", exc, exc_info=True) - return web.json_response({"success": False, "error": "Internal Error", "message": str(exc)}, status=500) + return web.json_response( + {"success": False, "error": "Internal Error", "message": str(exc)}, + status=500, + ) async def update_node_widget(self, request: web.Request) -> web.Response: try: @@ -1004,10 +1330,15 @@ class NodeRegistryHandler: node_ids = data.get("node_ids") if not isinstance(widget_name, str) or not widget_name: - return web.json_response({"success": False, "error": "Missing widget_name parameter"}, status=400) + return web.json_response( + {"success": False, "error": "Missing widget_name parameter"}, + status=400, + ) if not isinstance(value, str) or not value: - return web.json_response({"success": False, "error": "Missing value parameter"}, status=400) + return web.json_response( + {"success": False, "error": "Missing value parameter"}, status=400 + ) if not isinstance(node_ids, list) or not node_ids: return web.json_response( @@ -1107,7 +1438,9 @@ class MiscHandlerSet: self.metadata_archive = metadata_archive self.filesystem = filesystem - def to_route_mapping(self) -> Mapping[str, Callable[[web.Request], Awaitable[web.StreamResponse]]]: + def to_route_mapping( + self, + ) -> Mapping[str, Callable[[web.Request], Awaitable[web.StreamResponse]]]: return { "health_check": self.health.health_check, "get_settings": self.settings.get_settings, diff --git a/static/js/components/shared/ModelModal.js b/static/js/components/shared/ModelModal.js index 2dfb86e9..2aa84ac7 100644 --- a/static/js/components/shared/ModelModal.js +++ b/static/js/components/shared/ModelModal.js @@ -1001,7 +1001,20 @@ async function openFileLocation(filePath) { body: JSON.stringify({ 'file_path': filePath }) }); if (!resp.ok) throw new Error('Failed to open file location'); - showToast('modals.model.openFileLocation.success', {}, 'success'); + + const data = await resp.json(); + + if (data.mode === 'clipboard' && data.path) { + try { + await navigator.clipboard.writeText(data.path); + showToast('modals.model.openFileLocation.copied', { path: data.path }, 'success'); + } catch (clipboardErr) { + console.warn('Clipboard API not available:', clipboardErr); + showToast('modals.model.openFileLocation.clipboardFallback', { path: data.path }, 'info'); + } + } else { + showToast('modals.model.openFileLocation.success', {}, 'success'); + } } catch (err) { showToast('modals.model.openFileLocation.failed', {}, 'error'); } diff --git a/static/js/managers/SettingsManager.js b/static/js/managers/SettingsManager.js index 63d2b112..dd120ac2 100644 --- a/static/js/managers/SettingsManager.js +++ b/static/js/managers/SettingsManager.js @@ -364,7 +364,19 @@ export class SettingsManager { throw new Error(`Request failed with status ${response.status}`); } - showToast('settings.openSettingsFileLocation.success', {}, 'success'); + const data = await response.json(); + + if (data.mode === 'clipboard' && data.path) { + try { + await navigator.clipboard.writeText(data.path); + showToast('settings.openSettingsFileLocation.copied', { path: data.path }, 'success'); + } catch (clipboardErr) { + console.warn('Clipboard API not available:', clipboardErr); + showToast('settings.openSettingsFileLocation.clipboardFallback', { path: data.path }, 'info'); + } + } else { + showToast('settings.openSettingsFileLocation.success', {}, 'success'); + } } catch (error) { console.error('Failed to open settings file location:', error); showToast('settings.openSettingsFileLocation.failed', {}, 'error'); diff --git a/tests/routes/test_misc_routes.py b/tests/routes/test_misc_routes.py index c0565f0c..bdd3448e 100644 --- a/tests/routes/test_misc_routes.py +++ b/tests/routes/test_misc_routes.py @@ -1,5 +1,8 @@ import json +import os +import subprocess from types import SimpleNamespace +from unittest.mock import patch, MagicMock import pytest from aiohttp import web @@ -11,6 +14,9 @@ from py.routes.handlers.misc_handlers import ( NodeRegistryHandler, ServiceRegistryAdapter, SettingsHandler, + _is_wsl, + _wsl_to_windows_path, + _is_docker, ) from py.routes.misc_route_registrar import MISC_ROUTE_DEFINITIONS, MiscRouteRegistrar from py.routes.misc_routes import MiscRoutes @@ -114,11 +120,15 @@ def test_misc_route_registrar_registers_all_routes(): async def dummy_handler(_request): return web.Response() - handler_mapping = {definition.handler_name: dummy_handler for definition in MISC_ROUTE_DEFINITIONS} + handler_mapping = { + definition.handler_name: dummy_handler for definition in MISC_ROUTE_DEFINITIONS + } registrar.register_routes(handler_mapping) registered = {(method, path) for method, path, _ in app.router.calls} - expected = {(definition.method, definition.path) for definition in MISC_ROUTE_DEFINITIONS} + expected = { + (definition.method, definition.path) for definition in MISC_ROUTE_DEFINITIONS + } assert registered == expected @@ -236,7 +246,10 @@ async def test_register_nodes_includes_capabilities(): "graph_id": "root", "type": "CheckpointLoaderSimple", "title": "Checkpoint Loader", - "capabilities": {"supports_lora": False, "widget_names": ["ckpt_name", "", 42]}, + "capabilities": { + "supports_lora": False, + "widget_names": ["ckpt_name", "", 42], + }, } ] } @@ -249,7 +262,10 @@ async def test_register_nodes_includes_capabilities(): registry = await node_registry.get_registry() stored_node = next(iter(registry["nodes"].values())) - assert stored_node["capabilities"] == {"supports_lora": False, "widget_names": ["ckpt_name"]} + assert stored_node["capabilities"] == { + "supports_lora": False, + "widget_names": ["ckpt_name"], + } assert stored_node["widget_names"] == ["ckpt_name"] @@ -286,7 +302,12 @@ async def test_update_node_widget_sends_payload(): assert send_calls == [ ( "lm_widget_update", - {"id": 12, "widget_name": "ckpt_name", "value": "models/checkpoints/model.ckpt", "graph_id": "root"}, + { + "id": 12, + "widget_name": "ckpt_name", + "value": "models/checkpoints/model.ckpt", + "graph_id": "root", + }, ) ] @@ -428,7 +449,9 @@ async def test_misc_routes_bind_produces_expected_handlers(): controller = MiscRoutes( settings_service=DummySettings(), - usage_stats_factory=lambda: SimpleNamespace(process_execution=noop_async, get_stats=noop_async), + usage_stats_factory=lambda: SimpleNamespace( + process_execution=noop_async, get_stats=noop_async + ), prompt_server=FakePromptServer, service_registry_adapter=service_registry_adapter, metadata_provider_factory=fake_metadata_provider_factory, @@ -545,7 +568,9 @@ async def test_get_civitai_user_models_marks_library_versions(): metadata_provider_factory=provider_factory, ) - response = await handler.get_civitai_user_models(FakeRequest(query={"username": "pixel"})) + response = await handler.get_civitai_user_models( + FakeRequest(query={"username": "pixel"}) + ) payload = json.loads(response.text) assert payload["success"] is True @@ -657,12 +682,19 @@ async def test_get_civitai_user_models_rewrites_civitai_previews(): metadata_provider_factory=provider_factory, ) - response = await handler.get_civitai_user_models(FakeRequest(query={"username": "pixel"})) + response = await handler.get_civitai_user_models( + FakeRequest(query={"username": "pixel"}) + ) payload = json.loads(response.text) assert payload["success"] is True - previews_by_version = {item["versionId"]: item["thumbnailUrl"] for item in payload["versions"]} - assert previews_by_version[100] == "https://image.civitai.com/container/example/width=450,optimized=true/sample.jpeg" + previews_by_version = { + item["versionId"]: item["thumbnailUrl"] for item in payload["versions"] + } + assert ( + previews_by_version[100] + == "https://image.civitai.com/container/example/width=450,optimized=true/sample.jpeg" + ) assert ( previews_by_version[101] == "https://image.civitai.com/container/example/transcode=true,width=450,optimized=true/sample.mp4" @@ -706,7 +738,9 @@ def test_ensure_handler_mapping_caches_result(): controller = MiscRoutes( settings_service=DummySettings(), - usage_stats_factory=lambda: SimpleNamespace(process_execution=noop_async, get_stats=noop_async), + usage_stats_factory=lambda: SimpleNamespace( + process_execution=noop_async, get_stats=noop_async + ), prompt_server=FakePromptServer, service_registry_adapter=ServiceRegistryAdapter( get_lora_scanner=fake_scanner_factory, @@ -723,15 +757,17 @@ def test_ensure_handler_mapping_caches_result(): first_mapping = controller._ensure_handler_mapping() second_mapping = controller._ensure_handler_mapping() - assert first_mapping is second_mapping, "Expected cached handler mapping to be reused" + assert first_mapping is second_mapping, ( + "Expected cached handler mapping to be reused" + ) assert len(call_records) == 1, "Handler set factory should only be invoked once" @pytest.mark.asyncio async def test_check_model_exists_returns_local_versions(): versions = [ - {'versionId': 11, 'name': 'v1', 'fileName': 'model-one'}, - {'versionId': 12, 'name': 'v2', 'fileName': 'model-two'}, + {"versionId": 11, "name": "v1", "fileName": "model-one"}, + {"versionId": 12, "name": "v2", "fileName": "model-two"}, ] lora_scanner = RecordingVersionScanner(versions) @@ -756,12 +792,12 @@ async def test_check_model_exists_returns_local_versions(): metadata_provider_factory=fake_metadata_provider_factory, ) - response = await handler.check_model_exists(FakeRequest(query={'modelId': '5'})) + response = await handler.check_model_exists(FakeRequest(query={"modelId": "5"})) payload = json.loads(response.text) - assert payload['success'] is True - assert payload['modelType'] == 'lora' - assert payload['versions'] == versions + assert payload["success"] is True + assert payload["modelType"] == "lora" + assert payload["versions"] == versions assert lora_scanner.version_calls == [5] @@ -814,3 +850,119 @@ def test_create_handler_set_uses_provided_dependencies(): assert node_registry_handler._node_registry is fake_node_registry assert node_registry_handler._prompt_server is CustomPromptServer assert node_registry_handler._standalone_mode is True + + +def test_is_wsl_returns_true_in_wsl_environment(): + version_content = "Linux version 6.6.87.2-microsoft-standard-WSL2" + with patch("py.routes.handlers.misc_handlers.open") as mock_open: + mock_file = MagicMock() + mock_file.read.return_value = version_content + mock_open.return_value.__enter__ = MagicMock(return_value=mock_file) + mock_open.return_value.__exit__ = MagicMock(return_value=False) + result = _is_wsl() + assert result is True + + +def test_is_wsl_returns_false_in_non_wsl_environment(): + version_content = "Linux version 6.6.0-25-generic #26-Ubuntu SMP PREEMPT_DYNAMIC" + with patch("py.routes.handlers.misc_handlers.open") as mock_open: + mock_file = MagicMock() + mock_file.read.return_value = version_content + mock_open.return_value.__enter__ = MagicMock(return_value=mock_file) + mock_open.return_value.__exit__ = MagicMock(return_value=False) + result = _is_wsl() + assert result is False + + +def test_is_wsl_returns_false_on_read_error(): + with patch("py.routes.handlers.misc_handlers.open", side_effect=OSError()): + result = _is_wsl() + assert result is False + + +def test_is_wsl_returns_false_on_read_error(): + with patch("builtins.open", side_effect=OSError()): + result = _is_wsl() + assert result is False + + +def test_wsl_to_windows_path_converts_successfully(): + with patch("subprocess.run") as mock_run: + mock_result = MagicMock() + mock_result.stdout = "C:\\Users\\test\\file.txt\n" + mock_run.return_value = mock_result + + result = _wsl_to_windows_path("/mnt/c/test") + assert result == "C:\\Users\\test\\file.txt" + mock_run.assert_called_once() + + +def test_wsl_to_windows_path_returns_none_on_error(): + with patch("subprocess.run", side_effect=FileNotFoundError()): + result = _wsl_to_windows_path("/mnt/c/test") + assert result is None + + +def test_wsl_to_windows_path_returns_none_on_subprocess_error(): + with patch( + "subprocess.run", side_effect=subprocess.CalledProcessError(1, "wslpath") + ): + result = _wsl_to_windows_path("/mnt/c/test") + assert result is None + + +def test_is_docker_returns_true_when_dockerenv_exists(): + with patch("os.path.exists", return_value=True): + result = _is_docker() + assert result is True + + +def test_is_docker_checks_cgroup_when_dockerenv_missing(): + cgroup_content = "1:name=systemd:/docker/abc123\n" + with patch("os.path.exists", return_value=False): + with patch("py.routes.handlers.misc_handlers.open") as mock_open: + mock_file = MagicMock() + mock_file.read.return_value = cgroup_content + mock_open.return_value.__enter__ = MagicMock(return_value=mock_file) + mock_open.return_value.__exit__ = MagicMock(return_value=False) + result = _is_docker() + assert result is True + + +def test_is_docker_detects_kubernetes(): + cgroup_content = "12:pids:/kubepods/besteffort/pod123/abc123\n" + with patch("os.path.exists", return_value=False): + with patch("py.routes.handlers.misc_handlers.open") as mock_open: + mock_file = MagicMock() + mock_file.read.return_value = cgroup_content + mock_open.return_value.__enter__ = MagicMock(return_value=mock_file) + mock_open.return_value.__exit__ = MagicMock(return_value=False) + result = _is_docker() + assert result is True + + +def test_is_docker_returns_false_when_no_docker_detected(): + cgroup_content = "1:name=systemd:/user.slice/user-1000.slice\n" + with patch("os.path.exists", return_value=False): + with patch("py.routes.handlers.misc_handlers.open") as mock_open: + mock_file = MagicMock() + mock_file.read.return_value = cgroup_content + mock_open.return_value.__enter__ = MagicMock(return_value=mock_file) + mock_open.return_value.__exit__ = MagicMock(return_value=False) + result = _is_docker() + assert result is False + + +def test_is_docker_returns_false_on_cgroup_read_error(): + with patch("os.path.exists", return_value=False): + with patch("py.routes.handlers.misc_handlers.open", side_effect=OSError()): + result = _is_docker() + assert result is False + + +def test_wsl_to_windows_path_returns_none_on_subprocess_error(tmp_path): + with patch( + "subprocess.run", side_effect=subprocess.CalledProcessError(1, "wslpath") + ): + result = _wsl_to_windows_path("/mnt/c/test") + assert result is None