Add local availability check for Civitai model versions; enhance download manager UI to indicate local status

This commit is contained in:
Will Miao
2025-03-06 20:45:09 +08:00
parent 5c521e40d4
commit c9c86d8c0f
12 changed files with 1192 additions and 1101 deletions

View File

@@ -4,7 +4,7 @@ import os
import json
import logging
from email.parser import Parser
from typing import Optional, Dict, Tuple
from typing import Optional, Dict, Tuple, List
from urllib.parse import unquote
from ..utils.models import LoraMetadata
@@ -135,16 +135,15 @@ class CivitaiClient:
print(f"Download Error: {str(e)}")
return False
async def get_model_versions(self, model_id: str) -> Optional[Dict]:
"""Fetch all versions of a model"""
async def get_model_versions(self, model_id: str) -> List[Dict]:
"""Get all versions of a model with local availability info"""
try:
session = await self.session
url = f"{self.base_url}/models/{model_id}"
async with session.get(url, headers=self.headers) as response:
if response.status == 200:
data = await response.json()
return data.get('modelVersions', [])
return None
session = await self.session # 等待获取 session
async with session.get(f"{self.base_url}/models/{model_id}") as response:
if response.status != 200:
return None
data = await response.json()
return data.get('modelVersions', [])
except Exception as e:
logger.error(f"Error fetching model versions: {e}")
return None

View File

@@ -86,26 +86,32 @@ class LoraFileHandler(FileSystemEventHandler):
if not changes:
return
logger.info(f"Processing {len(changes)} file changes")
cache = await self.scanner.get_cached_data() # 先完成可能的初始化
cache = await self.scanner.get_cached_data()
needs_resort = False
new_folders = set() # 用于收集新的文件夹
new_folders = set()
for action, file_path in changes:
try:
if action == 'add':
# 扫描新文件
# Scan new file
lora_data = await self.scanner.scan_single_lora(file_path)
if lora_data:
cache.raw_data.append(lora_data)
new_folders.add(lora_data['folder']) # 收集新文件夹
new_folders.add(lora_data['folder'])
# Update hash index
if 'sha256' in lora_data:
self.scanner._hash_index.add_entry(
lora_data['sha256'],
lora_data['file_path']
)
needs_resort = True
elif action == 'remove':
# 从缓存中移除
# Remove from cache and hash index
logger.info(f"Removing {file_path} from cache")
self.scanner._hash_index.remove_by_path(file_path)
cache.raw_data = [
item for item in cache.raw_data
if item['file_path'] != file_path
@@ -118,7 +124,7 @@ class LoraFileHandler(FileSystemEventHandler):
if needs_resort:
await cache.resort()
# 更新文件夹列表,包括新添加的文件夹
# Update folder list
all_folders = set(cache.folders) | new_folders
cache.folders = sorted(list(all_folders), key=lambda x: x.lower())

View File

@@ -0,0 +1,48 @@
from typing import Dict, Optional
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class LoraHashIndex:
"""Index for mapping LoRA file hashes to their file paths"""
def __init__(self):
self._hash_to_path: Dict[str, str] = {}
def add_entry(self, sha256: str, file_path: str) -> None:
"""Add or update a hash -> path mapping"""
if not sha256 or not file_path:
return
self._hash_to_path[sha256] = file_path
def remove_entry(self, sha256: str) -> None:
"""Remove a hash entry"""
self._hash_to_path.pop(sha256, None)
def remove_by_path(self, file_path: str) -> None:
"""Remove entry by file path"""
for sha256, path in list(self._hash_to_path.items()):
if path == file_path:
del self._hash_to_path[sha256]
break
def get_path(self, sha256: str) -> Optional[str]:
"""Get file path for a given hash"""
return self._hash_to_path.get(sha256)
def get_hash(self, file_path: str) -> Optional[str]:
"""Get hash for a given file path"""
for sha256, path in self._hash_to_path.items():
if path == file_path:
return sha256
return None
def has_hash(self, sha256: str) -> bool:
"""Check if hash exists in index"""
return sha256 in self._hash_to_path
def clear(self) -> None:
"""Clear all entries"""
self._hash_to_path.clear()

View File

@@ -10,6 +10,7 @@ from ..config import config
from ..utils.file_utils import load_metadata, get_file_info
from .lora_cache import LoraCache
from difflib import SequenceMatcher
from .lora_hash_index import LoraHashIndex
logger = logging.getLogger(__name__)
@@ -28,6 +29,7 @@ class LoraScanner:
# 确保初始化只执行一次
if not hasattr(self, '_initialized'):
self._cache: Optional[LoraCache] = None
self._hash_index = LoraHashIndex()
self._initialization_lock = asyncio.Lock()
self._initialization_task: Optional[asyncio.Task] = None
self._initialized = True
@@ -85,9 +87,17 @@ class LoraScanner:
async def _initialize_cache(self) -> None:
"""Initialize or refresh the cache"""
try:
# Clear existing hash index
self._hash_index.clear()
# Scan for new data
raw_data = await self.scan_all_loras()
# Build hash index
for lora_data in raw_data:
if 'sha256' in lora_data and 'file_path' in lora_data:
self._hash_index.add_entry(lora_data['sha256'], lora_data['file_path'])
# Update cache
self._cache = LoraCache(
raw_data=raw_data,
@@ -416,13 +426,23 @@ class LoraScanner:
async def update_single_lora_cache(self, original_path: str, new_path: str, metadata: Dict) -> bool:
cache = await self.get_cached_data()
# Remove old path from hash index if exists
self._hash_index.remove_by_path(original_path)
cache.raw_data = [
item for item in cache.raw_data
if item['file_path'] != original_path
]
item for item in cache.raw_data
if item['file_path'] != original_path
]
if metadata:
metadata['folder'] = self._calculate_folder(new_path)
cache.raw_data.append(metadata)
# Update hash index with new path
if 'sha256' in metadata:
self._hash_index.add_entry(metadata['sha256'], new_path)
all_folders = set(cache.folders)
all_folders.add(metadata['folder'])
cache.folders = sorted(list(all_folders), key=lambda x: x.lower())
@@ -430,7 +450,6 @@ class LoraScanner:
# Resort cache
await cache.resort()
async def _update_metadata_paths(self, metadata_path: str, lora_path: str) -> Dict:
"""Update file paths in metadata file"""
try:
@@ -457,3 +476,16 @@ class LoraScanner:
except Exception as e:
logger.error(f"Error updating metadata paths: {e}", exc_info=True)
# Add new methods for hash index functionality
def has_lora_hash(self, sha256: str) -> bool:
"""Check if a LoRA with given hash exists"""
return self._hash_index.has_hash(sha256)
def get_lora_path_by_hash(self, sha256: str) -> Optional[str]:
"""Get file path for a LoRA by its hash"""
return self._hash_index.get_path(sha256)
def get_lora_hash_by_path(self, file_path: str) -> Optional[str]:
"""Get hash for a LoRA by its file path"""
return self._hash_index.get_hash(file_path)