From f2d36f5be9bd96ad7006db9c19f5cdab62350937 Mon Sep 17 00:00:00 2001 From: Will Miao <13051207myq@gmail.com> Date: Sun, 6 Apr 2025 22:27:55 +0800 Subject: [PATCH] Refactor DownloadManager and LoraFileHandler for improved file monitoring - Simplified the path handling in DownloadManager by directly adding normalized paths to the ignore list. - Updated LoraFileHandler to utilize a set for ignore paths, enhancing performance and clarity. - Implemented debouncing for modified file events to prevent duplicate processing and improve efficiency. - Enhanced the handling of file creation, modification, and deletion events for .safetensors files, ensuring accurate processing and logging. - Adjusted cache operations to streamline the addition and removal of files based on real paths. --- py/services/download_manager.py | 15 +- py/services/file_monitor.py | 304 ++++++++++++++++---------------- 2 files changed, 156 insertions(+), 163 deletions(-) diff --git a/py/services/download_manager.py b/py/services/download_manager.py index fddea0b5..df0d3af2 100644 --- a/py/services/download_manager.py +++ b/py/services/download_manager.py @@ -75,17 +75,10 @@ class DownloadManager: file_size = file_info.get('sizeKB', 0) * 1024 # 4. 通知文件监控系统 - 使用规范化路径和文件大小 - if self.file_monitor and self.file_monitor.handler: - # Add both the normalized path and potential alternative paths - normalized_path = save_path.replace(os.sep, '/') - self.file_monitor.handler.add_ignore_path(normalized_path, file_size) - - # Also add the path with file extension variations (.safetensors) - if not normalized_path.endswith('.safetensors'): - safetensors_path = os.path.splitext(normalized_path)[0] + '.safetensors' - self.file_monitor.handler.add_ignore_path(safetensors_path, file_size) - - logger.debug(f"Added download path to ignore list: {normalized_path} (size: {file_size} bytes)") + self.file_monitor.handler.add_ignore_path( + save_path.replace(os.sep, '/'), + file_size + ) # 5. 准备元数据 metadata = LoraMetadata.from_civitai_info(version_info, file_info, save_path) diff --git a/py/services/file_monitor.py b/py/services/file_monitor.py index 0124402d..9ed44d0f 100644 --- a/py/services/file_monitor.py +++ b/py/services/file_monitor.py @@ -2,9 +2,10 @@ from operator import itemgetter import os import logging import asyncio +import time from watchdog.observers import Observer -from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileDeletedEvent -from typing import List +from watchdog.events import FileSystemEventHandler +from typing import List, Dict, Set from threading import Lock from .lora_scanner import LoraScanner from ..config import config @@ -20,139 +21,174 @@ class LoraFileHandler(FileSystemEventHandler): self.pending_changes = set() # 待处理的变更 self.lock = Lock() # 线程安全锁 self.update_task = None # 异步更新任务 - self._ignore_paths = {} # Change to dictionary to store expiration times + self._ignore_paths = set() # Add ignore paths set self._min_ignore_timeout = 5 # minimum timeout in seconds self._download_speed = 1024 * 1024 # assume 1MB/s as base speed + + # Track modified files with timestamps for debouncing + self.modified_files: Dict[str, float] = {} + self.debounce_timer = None + self.debounce_delay = 3.0 # seconds to wait after last modification + + # Track files that are already scheduled for processing + self.scheduled_files: Set[str] = set() def _should_ignore(self, path: str) -> bool: """Check if path should be ignored""" real_path = os.path.realpath(path) # Resolve any symbolic links - normalized_path = real_path.replace(os.sep, '/') - - # Also check with backslashes for Windows compatibility - alt_path = real_path.replace('/', '\\') - - # 使用传入的事件循环而不是尝试获取当前线程的事件循环 - current_time = self.loop.time() - - # Check if path is in ignore list and not expired - if normalized_path in self._ignore_paths and self._ignore_paths[normalized_path] > current_time: - return True - - # Also check alternative path format - if alt_path in self._ignore_paths and self._ignore_paths[alt_path] > current_time: - return True - - return False + return real_path.replace(os.sep, '/') in self._ignore_paths def add_ignore_path(self, path: str, file_size: int = 0): """Add path to ignore list with dynamic timeout based on file size""" real_path = os.path.realpath(path) # Resolve any symbolic links - normalized_path = real_path.replace(os.sep, '/') + self._ignore_paths.add(real_path.replace(os.sep, '/')) - # Calculate timeout based on file size - # For small files, use minimum timeout - # For larger files, estimate download time + buffer - if file_size > 0: - # Estimate download time in seconds (size / speed) + buffer - estimated_time = (file_size / self._download_speed) + 10 - timeout = max(self._min_ignore_timeout, estimated_time) - else: - timeout = self._min_ignore_timeout - - current_time = self.loop.time() - expiration_time = current_time + timeout - - # Store both normalized and alternative path formats - self._ignore_paths[normalized_path] = expiration_time - - # Also store with backslashes for Windows compatibility - alt_path = real_path.replace('/', '\\') - self._ignore_paths[alt_path] = expiration_time - - logger.debug(f"Added ignore path: {normalized_path} (expires in {timeout:.1f}s)") + # Short timeout (e.g. 5 seconds) is sufficient to ignore the CREATE event + timeout = 5 self.loop.call_later( timeout, - self._remove_ignore_path, - normalized_path + self._ignore_paths.discard, + real_path.replace(os.sep, '/') ) - - def _remove_ignore_path(self, path: str): - """Remove path from ignore list after timeout""" - if path in self._ignore_paths: - del self._ignore_paths[path] - logger.debug(f"Removed ignore path: {path}") - # Also remove alternative path format - alt_path = path.replace('/', '\\') - if alt_path in self._ignore_paths: - del self._ignore_paths[alt_path] - def on_created(self, event): - if event.is_directory or not event.src_path.endswith('.safetensors'): + if event.is_directory: return - if self._should_ignore(event.src_path): - return - - # Check if file is still being downloaded - try: - file_size = os.path.getsize(event.src_path) - # Record the file path and size to handle potential deletion during download - self.add_ignore_path(event.src_path, file_size) - # Only process file if it exists and has non-zero size - if os.path.exists(event.src_path) and file_size > 0: - logger.info(f"LoRA file created: {event.src_path} (size: {file_size} bytes)") + # Handle safetensors files directly + if event.src_path.endswith('.safetensors'): + if self._should_ignore(event.src_path): + return + + # We'll process this file directly and ignore subsequent modifications + # to prevent duplicate processing + normalized_path = os.path.realpath(event.src_path).replace(os.sep, '/') + if normalized_path not in self.scheduled_files: + logger.info(f"LoRA file created: {event.src_path}") + self.scheduled_files.add(normalized_path) self._schedule_update('add', event.src_path) - else: - logger.debug(f"Ignoring empty or non-existent file: {event.src_path}") - except FileNotFoundError: - # File disappeared between event and our check - likely a temporary download file - logger.debug(f"File disappeared before processing: {event.src_path}") - except Exception as e: - logger.error(f"Error processing create event for {event.src_path}: {str(e)}") + + # Ignore modifications for a short period after creation + # This helps avoid duplicate processing + self.loop.call_later( + self.debounce_delay * 2, + self.scheduled_files.discard, + normalized_path + ) + + # For browser downloads, we'll catch them when they're renamed to .safetensors + + def on_modified(self, event): + if event.is_directory: + return + + # Only process safetensors files + if event.src_path.endswith('.safetensors'): + if self._should_ignore(event.src_path): + return + + normalized_path = os.path.realpath(event.src_path).replace(os.sep, '/') + + # Skip if this file is already scheduled for processing + if normalized_path in self.scheduled_files: + return + + # Update the timestamp for this file + self.modified_files[normalized_path] = time.time() + + # Cancel any existing timer + if self.debounce_timer: + self.debounce_timer.cancel() + + # Set a new timer to process modified files after debounce period + self.debounce_timer = self.loop.call_later( + self.debounce_delay, + self.loop.call_soon_threadsafe, + self._process_modified_files + ) + + def _process_modified_files(self): + """Process files that have been modified after debounce period""" + current_time = time.time() + files_to_process = [] + + # Find files that haven't been modified for debounce_delay seconds + for file_path, last_modified in list(self.modified_files.items()): + if current_time - last_modified >= self.debounce_delay: + # Only process if not already scheduled + if file_path not in self.scheduled_files: + files_to_process.append(file_path) + self.scheduled_files.add(file_path) + + # Auto-remove from scheduled list after reasonable time + self.loop.call_later( + self.debounce_delay * 2, + self.scheduled_files.discard, + file_path + ) + + del self.modified_files[file_path] + + # Process stable files + for file_path in files_to_process: + logger.info(f"Processing modified LoRA file: {file_path}") + self._schedule_update('add', file_path) def on_deleted(self, event): if event.is_directory or not event.src_path.endswith('.safetensors'): return - - # If this path is in our ignore list, it might be part of a download process - # Don't remove it from the cache yet if self._should_ignore(event.src_path): - logger.debug(f"Ignoring delete event for in-progress download: {event.src_path}") return - + + # Remove from scheduled files if present + normalized_path = os.path.realpath(event.src_path).replace(os.sep, '/') + self.scheduled_files.discard(normalized_path) + logger.info(f"LoRA file deleted: {event.src_path}") self._schedule_update('remove', event.src_path) - def on_modified(self, event): - if event.is_directory or not event.src_path.endswith('.safetensors'): - return - if self._should_ignore(event.src_path): - return + def on_moved(self, event): + """Handle file move/rename events""" - try: - # File modification could indicate download completion - file_size = os.path.getsize(event.src_path) - if file_size > 0: - logger.debug(f"LoRA file modified: {event.src_path} (size: {file_size} bytes)") - # Update the ignore timeout based on the new size - self.add_ignore_path(event.src_path, file_size) - # Schedule an update to add the file once the ignore period expires - self._schedule_update('add', event.src_path) - except FileNotFoundError: - # File disappeared - ignore - pass - except Exception as e: - logger.error(f"Error processing modify event for {event.src_path}: {str(e)}") + # If destination is a safetensors file, treat it as a new file + if event.dest_path.endswith('.safetensors'): + if self._should_ignore(event.dest_path): + return + + normalized_path = os.path.realpath(event.dest_path).replace(os.sep, '/') + + # Only process if not already scheduled + if normalized_path not in self.scheduled_files: + logger.info(f"LoRA file renamed/moved to: {event.dest_path}") + self.scheduled_files.add(normalized_path) + self._schedule_update('add', event.dest_path) + + # Auto-remove from scheduled list after reasonable time + self.loop.call_later( + self.debounce_delay * 2, + self.scheduled_files.discard, + normalized_path + ) + + # If source was a safetensors file, treat it as deleted + if event.src_path.endswith('.safetensors'): + if self._should_ignore(event.src_path): + return + + normalized_path = os.path.realpath(event.src_path).replace(os.sep, '/') + self.scheduled_files.discard(normalized_path) + + logger.info(f"LoRA file moved/renamed from: {event.src_path}") + self._schedule_update('remove', event.src_path) def _schedule_update(self, action: str, file_path: str): #file_path is a real path """Schedule a cache update""" with self.lock: - # Store the real path rather than trying to map it here - # This ensures we have the actual file system path when checking existence later - self.pending_changes.add((action, file_path)) + # 使用 config 中的方法映射路径 + mapped_path = config.map_path_to_link(file_path) + normalized_path = mapped_path.replace(os.sep, '/') + self.pending_changes.add((action, normalized_path)) self.loop.call_soon_threadsafe(self._create_update_task) @@ -161,8 +197,8 @@ class LoraFileHandler(FileSystemEventHandler): if self.update_task is None or self.update_task.done(): self.update_task = asyncio.create_task(self._process_changes()) - async def _process_changes(self, delay: float = 5.0): - """Process pending changes with debouncing - increased delay to allow downloads to complete""" + async def _process_changes(self, delay: float = 2.0): + """Process pending changes with debouncing""" await asyncio.sleep(delay) try: @@ -175,40 +211,21 @@ class LoraFileHandler(FileSystemEventHandler): logger.info(f"Processing {len(changes)} file changes") - # First collect all actions by file path to handle contradicting events - actions_by_path = {} - for action, file_path in changes: - # For the same file path, 'add' takes precedence over 'remove' - if file_path not in actions_by_path or action == 'add': - actions_by_path[file_path] = action - - # Process the final actions cache = await self.scanner.get_cached_data() needs_resort = False new_folders = set() - for file_path, action in actions_by_path.items(): + for action, file_path in changes: try: - # For 'add' actions, verify the file still exists and is complete if action == 'add': - # Use the original real path from the event for file system checks - real_path = file_path - - if not os.path.exists(real_path): - logger.warning(f"Skipping add for non-existent file: {real_path}") + # Check if file already exists in cache + existing = next((item for item in cache.raw_data if item['file_path'] == file_path), None) + if existing: + logger.info(f"File {file_path} already in cache, skipping") continue - file_size = os.path.getsize(real_path) - if file_size == 0: - logger.warning(f"Skipping add for empty file: {real_path}") - continue - - # Map the real path to link path for the cache after confirming file exists - mapped_path = config.map_path_to_link(real_path) - normalized_path = mapped_path.replace(os.sep, '/') - - # Scan new file with the mapped path - lora_data = await self.scanner.scan_single_lora(normalized_path) + # Scan new file + lora_data = await self.scanner.scan_single_lora(file_path) if lora_data: # Update tags count for tag in lora_data.get('tags', []): @@ -223,27 +240,10 @@ class LoraFileHandler(FileSystemEventHandler): lora_data['file_path'] ) needs_resort = True - logger.info(f"Added LoRA to cache: {normalized_path}") - - # Remove from ignore list now that it's been successfully processed - # This allows delete events to be processed immediately - real_path_normalized = os.path.realpath(real_path).replace(os.sep, '/') - alt_path = real_path_normalized.replace('/', '\\') - - if real_path_normalized in self._ignore_paths: - logger.debug(f"Removing successfully processed file from ignore list: {real_path_normalized}") - del self._ignore_paths[real_path_normalized] - - if alt_path in self._ignore_paths: - del self._ignore_paths[alt_path] elif action == 'remove': - # Map the path for removal operations - mapped_path = config.map_path_to_link(file_path) - normalized_path = mapped_path.replace(os.sep, '/') - # Find the lora to remove so we can update tags count - lora_to_remove = next((item for item in cache.raw_data if item['file_path'] == normalized_path), None) + lora_to_remove = next((item for item in cache.raw_data if item['file_path'] == file_path), None) if lora_to_remove: # Update tags count by reducing counts for tag in lora_to_remove.get('tags', []): @@ -253,16 +253,16 @@ class LoraFileHandler(FileSystemEventHandler): del self.scanner._tags_count[tag] # Remove from cache and hash index - logger.info(f"Removing {normalized_path} from cache") - self.scanner._hash_index.remove_by_path(normalized_path) + logger.info(f"Removing {file_path} from cache") + self.scanner._hash_index.remove_by_path(file_path) cache.raw_data = [ item for item in cache.raw_data - if item['file_path'] != normalized_path + if item['file_path'] != file_path ] needs_resort = True except Exception as e: - logger.error(f"Error processing {action} for {file_path}: {e}", exc_info=True) + logger.error(f"Error processing {action} for {file_path}: {e}") if needs_resort: await cache.resort() @@ -272,7 +272,7 @@ class LoraFileHandler(FileSystemEventHandler): cache.folders = sorted(list(all_folders), key=lambda x: x.lower()) except Exception as e: - logger.error(f"Error in process_changes: {e}", exc_info=True) + logger.error(f"Error in process_changes: {e}") class LoraFileMonitor: