Refactor DownloadManager and LoraFileHandler for improved file monitoring

- Simplified the path handling in DownloadManager by directly adding normalized paths to the ignore list.
- Updated LoraFileHandler to utilize a set for ignore paths, enhancing performance and clarity.
- Implemented debouncing for modified file events to prevent duplicate processing and improve efficiency.
- Enhanced the handling of file creation, modification, and deletion events for .safetensors files, ensuring accurate processing and logging.
- Adjusted cache operations to streamline the addition and removal of files based on real paths.
This commit is contained in:
Will Miao
2025-04-06 22:27:55 +08:00
parent 0b55f61fac
commit f2d36f5be9
2 changed files with 156 additions and 163 deletions

View File

@@ -75,17 +75,10 @@ class DownloadManager:
file_size = file_info.get('sizeKB', 0) * 1024
# 4. 通知文件监控系统 - 使用规范化路径和文件大小
if self.file_monitor and self.file_monitor.handler:
# Add both the normalized path and potential alternative paths
normalized_path = save_path.replace(os.sep, '/')
self.file_monitor.handler.add_ignore_path(normalized_path, file_size)
# Also add the path with file extension variations (.safetensors)
if not normalized_path.endswith('.safetensors'):
safetensors_path = os.path.splitext(normalized_path)[0] + '.safetensors'
self.file_monitor.handler.add_ignore_path(safetensors_path, file_size)
logger.debug(f"Added download path to ignore list: {normalized_path} (size: {file_size} bytes)")
self.file_monitor.handler.add_ignore_path(
save_path.replace(os.sep, '/'),
file_size
)
# 5. 准备元数据
metadata = LoraMetadata.from_civitai_info(version_info, file_info, save_path)

View File

@@ -2,9 +2,10 @@ from operator import itemgetter
import os
import logging
import asyncio
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileDeletedEvent
from typing import List
from watchdog.events import FileSystemEventHandler
from typing import List, Dict, Set
from threading import Lock
from .lora_scanner import LoraScanner
from ..config import config
@@ -20,139 +21,174 @@ class LoraFileHandler(FileSystemEventHandler):
self.pending_changes = set() # 待处理的变更
self.lock = Lock() # 线程安全锁
self.update_task = None # 异步更新任务
self._ignore_paths = {} # Change to dictionary to store expiration times
self._ignore_paths = set() # Add ignore paths set
self._min_ignore_timeout = 5 # minimum timeout in seconds
self._download_speed = 1024 * 1024 # assume 1MB/s as base speed
# Track modified files with timestamps for debouncing
self.modified_files: Dict[str, float] = {}
self.debounce_timer = None
self.debounce_delay = 3.0 # seconds to wait after last modification
# Track files that are already scheduled for processing
self.scheduled_files: Set[str] = set()
def _should_ignore(self, path: str) -> bool:
"""Check if path should be ignored"""
real_path = os.path.realpath(path) # Resolve any symbolic links
normalized_path = real_path.replace(os.sep, '/')
# Also check with backslashes for Windows compatibility
alt_path = real_path.replace('/', '\\')
# 使用传入的事件循环而不是尝试获取当前线程的事件循环
current_time = self.loop.time()
# Check if path is in ignore list and not expired
if normalized_path in self._ignore_paths and self._ignore_paths[normalized_path] > current_time:
return True
# Also check alternative path format
if alt_path in self._ignore_paths and self._ignore_paths[alt_path] > current_time:
return True
return False
return real_path.replace(os.sep, '/') in self._ignore_paths
def add_ignore_path(self, path: str, file_size: int = 0):
"""Add path to ignore list with dynamic timeout based on file size"""
real_path = os.path.realpath(path) # Resolve any symbolic links
normalized_path = real_path.replace(os.sep, '/')
self._ignore_paths.add(real_path.replace(os.sep, '/'))
# Calculate timeout based on file size
# For small files, use minimum timeout
# For larger files, estimate download time + buffer
if file_size > 0:
# Estimate download time in seconds (size / speed) + buffer
estimated_time = (file_size / self._download_speed) + 10
timeout = max(self._min_ignore_timeout, estimated_time)
else:
timeout = self._min_ignore_timeout
current_time = self.loop.time()
expiration_time = current_time + timeout
# Store both normalized and alternative path formats
self._ignore_paths[normalized_path] = expiration_time
# Also store with backslashes for Windows compatibility
alt_path = real_path.replace('/', '\\')
self._ignore_paths[alt_path] = expiration_time
logger.debug(f"Added ignore path: {normalized_path} (expires in {timeout:.1f}s)")
# Short timeout (e.g. 5 seconds) is sufficient to ignore the CREATE event
timeout = 5
self.loop.call_later(
timeout,
self._remove_ignore_path,
normalized_path
self._ignore_paths.discard,
real_path.replace(os.sep, '/')
)
def _remove_ignore_path(self, path: str):
"""Remove path from ignore list after timeout"""
if path in self._ignore_paths:
del self._ignore_paths[path]
logger.debug(f"Removed ignore path: {path}")
# Also remove alternative path format
alt_path = path.replace('/', '\\')
if alt_path in self._ignore_paths:
del self._ignore_paths[alt_path]
def on_created(self, event):
if event.is_directory or not event.src_path.endswith('.safetensors'):
if event.is_directory:
return
if self._should_ignore(event.src_path):
return
# Check if file is still being downloaded
try:
file_size = os.path.getsize(event.src_path)
# Record the file path and size to handle potential deletion during download
self.add_ignore_path(event.src_path, file_size)
# Only process file if it exists and has non-zero size
if os.path.exists(event.src_path) and file_size > 0:
logger.info(f"LoRA file created: {event.src_path} (size: {file_size} bytes)")
# Handle safetensors files directly
if event.src_path.endswith('.safetensors'):
if self._should_ignore(event.src_path):
return
# We'll process this file directly and ignore subsequent modifications
# to prevent duplicate processing
normalized_path = os.path.realpath(event.src_path).replace(os.sep, '/')
if normalized_path not in self.scheduled_files:
logger.info(f"LoRA file created: {event.src_path}")
self.scheduled_files.add(normalized_path)
self._schedule_update('add', event.src_path)
else:
logger.debug(f"Ignoring empty or non-existent file: {event.src_path}")
except FileNotFoundError:
# File disappeared between event and our check - likely a temporary download file
logger.debug(f"File disappeared before processing: {event.src_path}")
except Exception as e:
logger.error(f"Error processing create event for {event.src_path}: {str(e)}")
# Ignore modifications for a short period after creation
# This helps avoid duplicate processing
self.loop.call_later(
self.debounce_delay * 2,
self.scheduled_files.discard,
normalized_path
)
# For browser downloads, we'll catch them when they're renamed to .safetensors
def on_modified(self, event):
if event.is_directory:
return
# Only process safetensors files
if event.src_path.endswith('.safetensors'):
if self._should_ignore(event.src_path):
return
normalized_path = os.path.realpath(event.src_path).replace(os.sep, '/')
# Skip if this file is already scheduled for processing
if normalized_path in self.scheduled_files:
return
# Update the timestamp for this file
self.modified_files[normalized_path] = time.time()
# Cancel any existing timer
if self.debounce_timer:
self.debounce_timer.cancel()
# Set a new timer to process modified files after debounce period
self.debounce_timer = self.loop.call_later(
self.debounce_delay,
self.loop.call_soon_threadsafe,
self._process_modified_files
)
def _process_modified_files(self):
"""Process files that have been modified after debounce period"""
current_time = time.time()
files_to_process = []
# Find files that haven't been modified for debounce_delay seconds
for file_path, last_modified in list(self.modified_files.items()):
if current_time - last_modified >= self.debounce_delay:
# Only process if not already scheduled
if file_path not in self.scheduled_files:
files_to_process.append(file_path)
self.scheduled_files.add(file_path)
# Auto-remove from scheduled list after reasonable time
self.loop.call_later(
self.debounce_delay * 2,
self.scheduled_files.discard,
file_path
)
del self.modified_files[file_path]
# Process stable files
for file_path in files_to_process:
logger.info(f"Processing modified LoRA file: {file_path}")
self._schedule_update('add', file_path)
def on_deleted(self, event):
if event.is_directory or not event.src_path.endswith('.safetensors'):
return
# If this path is in our ignore list, it might be part of a download process
# Don't remove it from the cache yet
if self._should_ignore(event.src_path):
logger.debug(f"Ignoring delete event for in-progress download: {event.src_path}")
return
# Remove from scheduled files if present
normalized_path = os.path.realpath(event.src_path).replace(os.sep, '/')
self.scheduled_files.discard(normalized_path)
logger.info(f"LoRA file deleted: {event.src_path}")
self._schedule_update('remove', event.src_path)
def on_modified(self, event):
if event.is_directory or not event.src_path.endswith('.safetensors'):
return
if self._should_ignore(event.src_path):
return
def on_moved(self, event):
"""Handle file move/rename events"""
try:
# File modification could indicate download completion
file_size = os.path.getsize(event.src_path)
if file_size > 0:
logger.debug(f"LoRA file modified: {event.src_path} (size: {file_size} bytes)")
# Update the ignore timeout based on the new size
self.add_ignore_path(event.src_path, file_size)
# Schedule an update to add the file once the ignore period expires
self._schedule_update('add', event.src_path)
except FileNotFoundError:
# File disappeared - ignore
pass
except Exception as e:
logger.error(f"Error processing modify event for {event.src_path}: {str(e)}")
# If destination is a safetensors file, treat it as a new file
if event.dest_path.endswith('.safetensors'):
if self._should_ignore(event.dest_path):
return
normalized_path = os.path.realpath(event.dest_path).replace(os.sep, '/')
# Only process if not already scheduled
if normalized_path not in self.scheduled_files:
logger.info(f"LoRA file renamed/moved to: {event.dest_path}")
self.scheduled_files.add(normalized_path)
self._schedule_update('add', event.dest_path)
# Auto-remove from scheduled list after reasonable time
self.loop.call_later(
self.debounce_delay * 2,
self.scheduled_files.discard,
normalized_path
)
# If source was a safetensors file, treat it as deleted
if event.src_path.endswith('.safetensors'):
if self._should_ignore(event.src_path):
return
normalized_path = os.path.realpath(event.src_path).replace(os.sep, '/')
self.scheduled_files.discard(normalized_path)
logger.info(f"LoRA file moved/renamed from: {event.src_path}")
self._schedule_update('remove', event.src_path)
def _schedule_update(self, action: str, file_path: str): #file_path is a real path
"""Schedule a cache update"""
with self.lock:
# Store the real path rather than trying to map it here
# This ensures we have the actual file system path when checking existence later
self.pending_changes.add((action, file_path))
# 使用 config 中的方法映射路径
mapped_path = config.map_path_to_link(file_path)
normalized_path = mapped_path.replace(os.sep, '/')
self.pending_changes.add((action, normalized_path))
self.loop.call_soon_threadsafe(self._create_update_task)
@@ -161,8 +197,8 @@ class LoraFileHandler(FileSystemEventHandler):
if self.update_task is None or self.update_task.done():
self.update_task = asyncio.create_task(self._process_changes())
async def _process_changes(self, delay: float = 5.0):
"""Process pending changes with debouncing - increased delay to allow downloads to complete"""
async def _process_changes(self, delay: float = 2.0):
"""Process pending changes with debouncing"""
await asyncio.sleep(delay)
try:
@@ -175,40 +211,21 @@ class LoraFileHandler(FileSystemEventHandler):
logger.info(f"Processing {len(changes)} file changes")
# First collect all actions by file path to handle contradicting events
actions_by_path = {}
for action, file_path in changes:
# For the same file path, 'add' takes precedence over 'remove'
if file_path not in actions_by_path or action == 'add':
actions_by_path[file_path] = action
# Process the final actions
cache = await self.scanner.get_cached_data()
needs_resort = False
new_folders = set()
for file_path, action in actions_by_path.items():
for action, file_path in changes:
try:
# For 'add' actions, verify the file still exists and is complete
if action == 'add':
# Use the original real path from the event for file system checks
real_path = file_path
if not os.path.exists(real_path):
logger.warning(f"Skipping add for non-existent file: {real_path}")
# Check if file already exists in cache
existing = next((item for item in cache.raw_data if item['file_path'] == file_path), None)
if existing:
logger.info(f"File {file_path} already in cache, skipping")
continue
file_size = os.path.getsize(real_path)
if file_size == 0:
logger.warning(f"Skipping add for empty file: {real_path}")
continue
# Map the real path to link path for the cache after confirming file exists
mapped_path = config.map_path_to_link(real_path)
normalized_path = mapped_path.replace(os.sep, '/')
# Scan new file with the mapped path
lora_data = await self.scanner.scan_single_lora(normalized_path)
# Scan new file
lora_data = await self.scanner.scan_single_lora(file_path)
if lora_data:
# Update tags count
for tag in lora_data.get('tags', []):
@@ -223,27 +240,10 @@ class LoraFileHandler(FileSystemEventHandler):
lora_data['file_path']
)
needs_resort = True
logger.info(f"Added LoRA to cache: {normalized_path}")
# Remove from ignore list now that it's been successfully processed
# This allows delete events to be processed immediately
real_path_normalized = os.path.realpath(real_path).replace(os.sep, '/')
alt_path = real_path_normalized.replace('/', '\\')
if real_path_normalized in self._ignore_paths:
logger.debug(f"Removing successfully processed file from ignore list: {real_path_normalized}")
del self._ignore_paths[real_path_normalized]
if alt_path in self._ignore_paths:
del self._ignore_paths[alt_path]
elif action == 'remove':
# Map the path for removal operations
mapped_path = config.map_path_to_link(file_path)
normalized_path = mapped_path.replace(os.sep, '/')
# Find the lora to remove so we can update tags count
lora_to_remove = next((item for item in cache.raw_data if item['file_path'] == normalized_path), None)
lora_to_remove = next((item for item in cache.raw_data if item['file_path'] == file_path), None)
if lora_to_remove:
# Update tags count by reducing counts
for tag in lora_to_remove.get('tags', []):
@@ -253,16 +253,16 @@ class LoraFileHandler(FileSystemEventHandler):
del self.scanner._tags_count[tag]
# Remove from cache and hash index
logger.info(f"Removing {normalized_path} from cache")
self.scanner._hash_index.remove_by_path(normalized_path)
logger.info(f"Removing {file_path} from cache")
self.scanner._hash_index.remove_by_path(file_path)
cache.raw_data = [
item for item in cache.raw_data
if item['file_path'] != normalized_path
if item['file_path'] != file_path
]
needs_resort = True
except Exception as e:
logger.error(f"Error processing {action} for {file_path}: {e}", exc_info=True)
logger.error(f"Error processing {action} for {file_path}: {e}")
if needs_resort:
await cache.resort()
@@ -272,7 +272,7 @@ class LoraFileHandler(FileSystemEventHandler):
cache.folders = sorted(list(all_folders), key=lambda x: x.lower())
except Exception as e:
logger.error(f"Error in process_changes: {e}", exc_info=True)
logger.error(f"Error in process_changes: {e}")
class LoraFileMonitor: