mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-05-06 16:36:45 -03:00
315 lines
12 KiB
Python
315 lines
12 KiB
Python
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from urllib.parse import quote
|
|
|
|
from aiohttp import web
|
|
from ..services.settings_manager import get_settings_manager
|
|
from ..utils.example_images_paths import (
|
|
get_model_folder,
|
|
)
|
|
from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
_WINDOWS_DRIVE_PATTERN = re.compile(r"^[A-Za-z]:/")
|
|
|
|
|
|
def _is_within_root(path: str, root: str) -> bool:
|
|
try:
|
|
return os.path.commonpath([os.path.abspath(path), os.path.abspath(root)]) == os.path.abspath(root)
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def _join_local_example_path(local_root: str, relative_path: str) -> str:
|
|
separator = "\\" if "\\" in local_root and "/" not in local_root else "/"
|
|
normalized_root = local_root.rstrip("\\/")
|
|
normalized_relative = relative_path.replace("/", separator)
|
|
if not normalized_root:
|
|
return normalized_relative
|
|
return f"{normalized_root}{separator}{normalized_relative}"
|
|
|
|
|
|
def _build_file_uri(path: str) -> str:
|
|
normalized = path.replace("\\", "/")
|
|
if _WINDOWS_DRIVE_PATTERN.match(normalized):
|
|
return f"file:///{quote(normalized, safe='/:')}"
|
|
if normalized.startswith("/"):
|
|
return f"file://{quote(normalized, safe='/:')}"
|
|
return f"file:///{quote(normalized.lstrip('/'), safe='/:')}"
|
|
|
|
|
|
def _render_open_uri_template(template: str, local_path: str, relative_path: str) -> str:
|
|
file_uri = _build_file_uri(local_path)
|
|
replacements = {
|
|
"{{local_path}}": local_path,
|
|
"{{encoded_local_path}}": quote(local_path, safe=""),
|
|
"{{relative_path}}": relative_path,
|
|
"{{encoded_relative_path}}": quote(relative_path, safe=""),
|
|
"{{file_uri}}": file_uri,
|
|
"{{encoded_file_uri}}": quote(file_uri, safe=""),
|
|
}
|
|
|
|
rendered = template
|
|
for placeholder, value in replacements.items():
|
|
rendered = rendered.replace(placeholder, value)
|
|
return rendered
|
|
|
|
|
|
def _open_system_folder(model_folder: str) -> dict[str, object]:
|
|
if os.name == "nt": # Windows
|
|
os.startfile(model_folder)
|
|
elif os.name == "posix": # macOS and Linux
|
|
if sys.platform == "darwin": # macOS
|
|
subprocess.Popen(["open", model_folder])
|
|
else: # Linux
|
|
subprocess.Popen(["xdg-open", model_folder])
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Opened example images folder for {model_folder}",
|
|
"path": model_folder,
|
|
}
|
|
|
|
|
|
class ExampleImagesFileManager:
|
|
"""Manages access and operations for example image files"""
|
|
|
|
@staticmethod
|
|
async def open_folder(request):
|
|
"""
|
|
Open the example images folder for a specific model
|
|
|
|
Expects a JSON request body with:
|
|
{
|
|
"model_hash": "sha256_hash" # SHA256 hash of the model
|
|
}
|
|
"""
|
|
try:
|
|
# Parse request body
|
|
data = await request.json()
|
|
model_hash = data.get('model_hash')
|
|
|
|
if not model_hash:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'Missing model_hash parameter'
|
|
}, status=400)
|
|
|
|
# Get example images path from settings
|
|
settings_manager = get_settings_manager()
|
|
example_images_path = settings_manager.get('example_images_path')
|
|
if not example_images_path:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'No example images path configured. Please set it in the settings panel first.'
|
|
}, status=400)
|
|
|
|
# Construct folder path for this model
|
|
model_folder = get_model_folder(model_hash)
|
|
if not model_folder:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'Failed to resolve example images folder for this model.'
|
|
}, status=500)
|
|
|
|
# Path validation: ensure model_folder is under example_images_path
|
|
if not _is_within_root(model_folder, example_images_path):
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'Invalid model folder path'
|
|
}, status=400)
|
|
|
|
# Check if folder exists
|
|
if not os.path.exists(model_folder):
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'No example images found for this model. Download example images first.'
|
|
}, status=404)
|
|
|
|
root_path = os.path.abspath(example_images_path)
|
|
relative_path = os.path.relpath(model_folder, root_path).replace("\\", "/")
|
|
open_mode = settings_manager.get("example_images_open_mode") or "system"
|
|
|
|
if open_mode == "clipboard":
|
|
local_root = settings_manager.get("example_images_local_root") or root_path
|
|
local_path = _join_local_example_path(local_root, relative_path)
|
|
return web.json_response({
|
|
'success': True,
|
|
'mode': 'clipboard',
|
|
'path': local_path,
|
|
'relative_path': relative_path,
|
|
})
|
|
|
|
if open_mode == "uri_template":
|
|
local_root = settings_manager.get("example_images_local_root") or root_path
|
|
uri_template = settings_manager.get("example_images_open_uri_template") or ""
|
|
if not uri_template.strip():
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'No example image open URI template configured.'
|
|
}, status=400)
|
|
|
|
local_path = _join_local_example_path(local_root, relative_path)
|
|
return web.json_response({
|
|
'success': True,
|
|
'mode': 'uri',
|
|
'path': local_path,
|
|
'relative_path': relative_path,
|
|
'uri': _render_open_uri_template(uri_template, local_path, relative_path),
|
|
})
|
|
|
|
return web.json_response(_open_system_folder(model_folder))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to open example images folder: {e}", exc_info=True)
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': str(e)
|
|
}, status=500)
|
|
|
|
@staticmethod
|
|
async def get_files(request):
|
|
"""
|
|
Get the list of example image files for a specific model
|
|
|
|
Expects:
|
|
- model_hash in query parameters
|
|
|
|
Returns:
|
|
- List of image files and their paths
|
|
"""
|
|
try:
|
|
# Get model_hash from query parameters
|
|
model_hash = request.query.get('model_hash')
|
|
|
|
if not model_hash:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'Missing model_hash parameter'
|
|
}, status=400)
|
|
|
|
# Get example images path from settings
|
|
settings_manager = get_settings_manager()
|
|
example_images_path = settings_manager.get('example_images_path')
|
|
if not example_images_path:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'No example images path configured'
|
|
}, status=400)
|
|
|
|
# Construct folder path for this model
|
|
model_folder = get_model_folder(model_hash)
|
|
if not model_folder:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'Failed to resolve example images folder for this model'
|
|
}, status=500)
|
|
|
|
# Check if folder exists
|
|
if not os.path.exists(model_folder):
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'No example images found for this model',
|
|
'files': []
|
|
}, status=404)
|
|
|
|
# Get list of files in the folder
|
|
files = []
|
|
for file in os.listdir(model_folder):
|
|
file_path = os.path.join(model_folder, file)
|
|
if os.path.isfile(file_path):
|
|
# Check if file is a supported media file
|
|
file_ext = os.path.splitext(file)[1].lower()
|
|
if (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
|
|
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']):
|
|
relative_path = os.path.relpath(model_folder, os.path.abspath(example_images_path)).replace("\\", "/")
|
|
files.append({
|
|
'name': file,
|
|
'path': f'/example_images_static/{relative_path}/{file}',
|
|
'extension': file_ext,
|
|
'is_video': file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']
|
|
})
|
|
|
|
return web.json_response({
|
|
'success': True,
|
|
'files': files
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get example image files: {e}", exc_info=True)
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': str(e)
|
|
}, status=500)
|
|
|
|
@staticmethod
|
|
async def has_images(request):
|
|
"""
|
|
Check if the example images folder for a model exists and is not empty
|
|
|
|
Expects:
|
|
- model_hash in query parameters
|
|
|
|
Returns:
|
|
- Boolean indicating whether the folder exists and contains images/videos
|
|
"""
|
|
try:
|
|
# Get model_hash from query parameters
|
|
model_hash = request.query.get('model_hash')
|
|
|
|
if not model_hash:
|
|
return web.json_response({
|
|
'success': False,
|
|
'error': 'Missing model_hash parameter'
|
|
}, status=400)
|
|
|
|
# Get example images path from settings
|
|
settings_manager = get_settings_manager()
|
|
example_images_path = settings_manager.get('example_images_path')
|
|
if not example_images_path:
|
|
return web.json_response({
|
|
'has_images': False
|
|
})
|
|
|
|
# Construct folder path for this model
|
|
model_folder = get_model_folder(model_hash)
|
|
if not model_folder:
|
|
return web.json_response({
|
|
'has_images': False,
|
|
'error': 'Failed to resolve example images folder for this model'
|
|
})
|
|
|
|
# Check if folder exists
|
|
if not os.path.exists(model_folder) or not os.path.isdir(model_folder):
|
|
return web.json_response({
|
|
'has_images': False
|
|
})
|
|
|
|
# Check if folder contains any supported media files
|
|
for file in os.listdir(model_folder):
|
|
file_path = os.path.join(model_folder, file)
|
|
if os.path.isfile(file_path):
|
|
file_ext = os.path.splitext(file)[1].lower()
|
|
if (file_ext in SUPPORTED_MEDIA_EXTENSIONS['images'] or
|
|
file_ext in SUPPORTED_MEDIA_EXTENSIONS['videos']):
|
|
return web.json_response({
|
|
'has_images': True
|
|
})
|
|
|
|
# If reached here, folder exists but has no supported media files
|
|
return web.json_response({
|
|
'has_images': False
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to check example images folder: {e}", exc_info=True)
|
|
return web.json_response({
|
|
'has_images': False,
|
|
'error': str(e)
|
|
})
|