mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-25 07:05:43 -03:00
Add delete confirmation modal and update related styles and logic
This commit is contained in:
168
nodes.py
168
nodes.py
@@ -10,6 +10,7 @@ from .utils.file_utils import get_file_info, save_metadata, load_metadata, updat
|
||||
from .utils.lora_metadata import extract_lora_metadata
|
||||
from typing import Dict, Optional
|
||||
from .services.civitai_client import CivitaiClient
|
||||
import folder_paths
|
||||
|
||||
class LorasEndpoint:
|
||||
def __init__(self):
|
||||
@@ -19,9 +20,12 @@ class LorasEndpoint:
|
||||
),
|
||||
autoescape=True
|
||||
)
|
||||
# 配置Loras根目录(根据实际安装位置调整)
|
||||
self.loras_root = os.path.join(Path(__file__).parents[2], "models", "loras")
|
||||
# 添加 server 属性
|
||||
# Configure Loras root directories (from ComfyUI folder paths settings)
|
||||
self.loras_roots = [path.replace(os.sep, "/") for path in folder_paths.get_folder_paths("loras") if os.path.exists(path)]
|
||||
if not self.loras_roots:
|
||||
raise ValueError("No valid loras folders found")
|
||||
print(f"Loras roots: {self.loras_roots}") # debug log
|
||||
|
||||
self.server = PromptServer.instance
|
||||
|
||||
@classmethod
|
||||
@@ -29,9 +33,16 @@ class LorasEndpoint:
|
||||
instance = cls()
|
||||
app = PromptServer.instance.app
|
||||
static_path = os.path.join(os.path.dirname(__file__), 'static')
|
||||
|
||||
# Generate multiple static paths based on the number of folders in instance.loras_roots
|
||||
for idx, root in enumerate(instance.loras_roots, start=1):
|
||||
# Create different static paths for each folder, like /loras_static/root1/preview
|
||||
preview_path = f'/loras_static/root{idx}/preview'
|
||||
app.add_routes([web.static(preview_path, root)])
|
||||
|
||||
app.add_routes([
|
||||
web.get('/loras', instance.handle_loras_request),
|
||||
web.static('/loras_static/previews', instance.loras_root),
|
||||
# web.static('/loras_static/previews', instance.loras_root),
|
||||
web.static('/loras_static', static_path),
|
||||
web.post('/api/delete_model', instance.delete_model),
|
||||
web.post('/api/fetch-civitai', instance.fetch_civitai),
|
||||
@@ -52,34 +63,35 @@ class LorasEndpoint:
|
||||
|
||||
async def scan_loras(self):
|
||||
loras = []
|
||||
for root, _, files in os.walk(self.loras_root):
|
||||
safetensors_files = [f for f in files if f.endswith('.safetensors')]
|
||||
total_files = len(safetensors_files)
|
||||
|
||||
for idx, filename in enumerate(safetensors_files, 1):
|
||||
self.send_progress(idx, total_files, f"Scanning: {filename}")
|
||||
for loras_root in self.loras_roots:
|
||||
for root, _, files in os.walk(loras_root):
|
||||
safetensors_files = [f for f in files if f.endswith('.safetensors')]
|
||||
total_files = len(safetensors_files)
|
||||
|
||||
file_path = os.path.join(root, filename)
|
||||
|
||||
# Try to load existing metadata first
|
||||
metadata = await load_metadata(file_path, self.loras_root)
|
||||
|
||||
if metadata is None:
|
||||
# Only get file info and extract metadata if no existing metadata
|
||||
metadata = await get_file_info(file_path, self.loras_root)
|
||||
base_model_info = await extract_lora_metadata(file_path)
|
||||
metadata.base_model = base_model_info['base_model']
|
||||
await save_metadata(file_path, metadata)
|
||||
|
||||
# Convert to dict for API response
|
||||
lora_data = metadata.to_dict()
|
||||
# Get relative path and remove filename to get just the folder structure
|
||||
rel_path = os.path.relpath(file_path, self.loras_root)
|
||||
folder = os.path.dirname(rel_path)
|
||||
# Ensure forward slashes for consistency across platforms
|
||||
lora_data['folder'] = folder.replace(os.path.sep, '/')
|
||||
|
||||
loras.append(lora_data)
|
||||
for idx, filename in enumerate(safetensors_files, 1):
|
||||
self.send_progress(idx, total_files, f"Scanning: {filename}")
|
||||
|
||||
file_path = os.path.join(root, filename).replace(os.sep, "/")
|
||||
|
||||
# Try to load existing metadata first
|
||||
metadata = await load_metadata(file_path)
|
||||
|
||||
if metadata is None:
|
||||
# Only get file info and extract metadata if no existing metadata
|
||||
metadata = await get_file_info(file_path)
|
||||
base_model_info = await extract_lora_metadata(file_path)
|
||||
metadata.base_model = base_model_info['base_model']
|
||||
await save_metadata(file_path, metadata)
|
||||
|
||||
# Convert to dict for API response
|
||||
lora_data = metadata.to_dict()
|
||||
# Get relative path and remove filename to get just the folder structure
|
||||
rel_path = os.path.relpath(file_path, loras_root)
|
||||
folder = os.path.dirname(rel_path)
|
||||
# Ensure forward slashes for consistency across platforms
|
||||
lora_data['folder'] = folder.replace(os.path.sep, '/')
|
||||
|
||||
loras.append(lora_data)
|
||||
|
||||
self.send_progress(total_files, total_files, "Scan completed")
|
||||
return loras
|
||||
@@ -89,13 +101,6 @@ class LorasEndpoint:
|
||||
"""清理HTML格式的描述"""
|
||||
return desc.replace("<p>", "").replace("</p>", "\n").strip()
|
||||
|
||||
async def get_preview_url(self, preview_path, root_dir):
|
||||
"""生成预览图URL"""
|
||||
if os.path.exists(preview_path):
|
||||
rel_path = os.path.relpath(preview_path, self.loras_root)
|
||||
return f"/loras_static/previews/{rel_path.replace(os.sep, '/')}"
|
||||
return "/loras_static/images/no-preview.png"
|
||||
|
||||
async def handle_loras_request(self, request):
|
||||
"""处理Loras请求并渲染模板"""
|
||||
try:
|
||||
@@ -145,29 +150,43 @@ class LorasEndpoint:
|
||||
return {
|
||||
"model_name": lora["model_name"],
|
||||
"file_name": lora["file_name"],
|
||||
"preview_url": lora["preview_url"],
|
||||
"preview_url": self.get_static_url_for_preview(lora["preview_url"]),
|
||||
"base_model": lora["base_model"],
|
||||
"folder": lora["folder"],
|
||||
"sha256": lora["sha256"],
|
||||
"file_path": lora["file_path"],
|
||||
"file_path": lora["file_path"].replace(os.sep, "/"),
|
||||
"modified": lora["modified"],
|
||||
"from_civitai": lora.get("from_civitai", True),
|
||||
"civitai": self.filter_civitai_data(lora.get("civitai", {}))
|
||||
}
|
||||
|
||||
def get_static_url_for_preview(self, preview_url):
|
||||
"""
|
||||
Determines which loras_root the preview_url belongs to and
|
||||
returns the corresponding static URL.
|
||||
"""
|
||||
for idx, root in enumerate(self.loras_roots, start=1):
|
||||
# Check if preview_url belongs to current root
|
||||
if preview_url.startswith(root):
|
||||
# Get relative path and generate static URL
|
||||
relative_path = os.path.relpath(preview_url, root)
|
||||
static_url = f'/loras_static/root{idx}/preview/{relative_path.replace(os.sep, "/")}'
|
||||
return static_url
|
||||
|
||||
# If no matching root found, return empty string
|
||||
return ""
|
||||
|
||||
|
||||
async def delete_model(self, request):
|
||||
try:
|
||||
data = await request.json()
|
||||
file_name = data.get('file_name')
|
||||
folder = data.get('folder') # 从请求中获取folder信息
|
||||
if not file_name:
|
||||
return web.Response(text='Model name is required', status=400)
|
||||
file_path = data.get('file_path') # 从请求中获取file_path信息
|
||||
if not file_path:
|
||||
return web.Response(text='Model full path is required', status=400)
|
||||
|
||||
# 构建完整的目录路径
|
||||
target_dir = self.loras_root
|
||||
if folder and folder != "root":
|
||||
target_dir = os.path.join(self.loras_root, folder)
|
||||
target_dir = os.path.dirname(file_path)
|
||||
file_name = os.path.splitext(os.path.basename(file_path))[0]
|
||||
|
||||
# List of file patterns to delete
|
||||
required_file = f"{file_name}.safetensors" # 主文件必须存在
|
||||
@@ -176,7 +195,13 @@ class LorasEndpoint:
|
||||
f"{file_name}.preview.png",
|
||||
f"{file_name}.preview.jpg",
|
||||
f"{file_name}.preview.jpeg",
|
||||
f"{file_name}.preview.webp"
|
||||
f"{file_name}.preview.webp",
|
||||
f"{file_name}.preview.mp4",
|
||||
f"{file_name}.png",
|
||||
f"{file_name}.jpg",
|
||||
f"{file_name}.jpeg",
|
||||
f"{file_name}.webp",
|
||||
f"{file_name}.mp4"
|
||||
]
|
||||
|
||||
deleted_files = []
|
||||
@@ -201,7 +226,7 @@ class LorasEndpoint:
|
||||
except Exception as e:
|
||||
print(f"Error deleting optional file {file_path}: {str(e)}")
|
||||
else:
|
||||
return web.Response(text=f"Model file {required_file} not found in {folder}", status=404)
|
||||
return web.Response(text=f"Model file {required_file} not found in {target_dir}", status=404)
|
||||
|
||||
return web.json_response({
|
||||
'success': True,
|
||||
@@ -268,7 +293,7 @@ class LorasEndpoint:
|
||||
|
||||
# 4. 下载预览图
|
||||
# Check if existing preview is valid and the file exists
|
||||
if not local_metadata.get('preview_url') or not os.path.exists(os.path.join(self.loras_root, local_metadata['preview_url'].replace('/', os.sep))):
|
||||
if not local_metadata.get('preview_url') or not os.path.exists(local_metadata['preview_url']):
|
||||
first_preview = next((img for img in civitai_metadata.get('images', [])), None)
|
||||
if first_preview:
|
||||
|
||||
@@ -277,7 +302,7 @@ class LorasEndpoint:
|
||||
preview_path = os.path.join(os.path.dirname(data['file_path']), preview_filename)
|
||||
await client.download_preview_image(first_preview['url'], preview_path)
|
||||
# 存储相对路径,使用正斜杠格式
|
||||
local_metadata['preview_url'] = os.path.relpath(preview_path, self.loras_root).replace(os.sep, '/')
|
||||
local_metadata['preview_url'] = preview_path.replace(os.sep, '/')
|
||||
|
||||
# 5. 保存更新后的元数据
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
@@ -307,28 +332,22 @@ class LorasEndpoint:
|
||||
try:
|
||||
reader = await request.multipart()
|
||||
|
||||
# Get the file field first
|
||||
# Get the preview_file field first
|
||||
file_field = await reader.next()
|
||||
if file_field.name != 'file':
|
||||
raise ValueError("Expected 'file' field first")
|
||||
file_data = await file_field.read()
|
||||
if file_field.name != 'preview_file':
|
||||
raise ValueError("Expected 'preview_file' field first")
|
||||
preview_data = await file_field.read()
|
||||
|
||||
# Get the file name field
|
||||
# Get the file model_path field
|
||||
name_field = await reader.next()
|
||||
if name_field.name != 'file_name':
|
||||
raise ValueError("Expected 'file_name' field second")
|
||||
file_name = (await name_field.read()).decode()
|
||||
|
||||
# Get the folder field
|
||||
folder_field = await reader.next()
|
||||
if folder_field.name != 'folder':
|
||||
raise ValueError("Expected 'folder' field third")
|
||||
folder = (await folder_field.read()).decode()
|
||||
if name_field.name != 'model_path':
|
||||
raise ValueError("Expected 'model_path' field second")
|
||||
model_path = (await name_field.read()).decode()
|
||||
|
||||
# Get the content type from the file field headers
|
||||
content_type = file_field.headers.get('Content-Type', '')
|
||||
|
||||
print(f"Received preview file: {file_name} ({content_type})") # Debug log
|
||||
print(f"Received preview file: {model_path} ({content_type})") # Debug log
|
||||
|
||||
# Determine file extension based on content type
|
||||
if content_type.startswith('video/'):
|
||||
@@ -337,29 +356,34 @@ class LorasEndpoint:
|
||||
extension = '.preview.png'
|
||||
|
||||
# Construct the preview file path
|
||||
base_name = os.path.splitext(file_name)[0] # Remove original extension
|
||||
base_name = os.path.splitext(os.path.basename(model_path))[0] # Remove original extension
|
||||
preview_name = base_name + extension
|
||||
preview_path = os.path.join(self.loras_root, folder, preview_name)
|
||||
# Get the folder path from the model_path
|
||||
folder = os.path.dirname(model_path)
|
||||
preview_path = os.path.join(folder, preview_name).replace(os.sep, '/')
|
||||
|
||||
# Save the preview file
|
||||
with open(preview_path, 'wb') as f:
|
||||
f.write(file_data)
|
||||
f.write(preview_data)
|
||||
|
||||
# Update metadata if it exists
|
||||
metadata_path = os.path.join(self.loras_root, folder, base_name + '.metadata.json')
|
||||
metadata_path = os.path.join(folder, base_name + '.metadata.json')
|
||||
if os.path.exists(metadata_path):
|
||||
try:
|
||||
with open(metadata_path, 'r', encoding='utf-8') as f:
|
||||
metadata = json.load(f)
|
||||
# Update the preview_url to match the new file name
|
||||
metadata['preview_url'] = os.path.join(folder, preview_name).replace(os.sep, '/')
|
||||
metadata['preview_url'] = preview_path
|
||||
with open(metadata_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||
except Exception as e:
|
||||
print(f"Error updating metadata: {str(e)}")
|
||||
# Continue even if metadata update fails
|
||||
|
||||
return web.Response(status=200)
|
||||
return web.json_response({
|
||||
"success": True,
|
||||
"preview_url": self.get_static_url_for_preview(preview_path)
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error replacing preview: {str(e)}")
|
||||
|
||||
Reference in New Issue
Block a user