From 1814f83bee34adf96c6afdcf11fc61febd4a30e6 Mon Sep 17 00:00:00 2001 From: Will Miao <13051207myq@gmail.com> Date: Mon, 25 Aug 2025 09:03:40 +0800 Subject: [PATCH] feat: Implement post-initialization tasks and backup file cleanup in LoraManager --- py/lora_manager.py | 139 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 135 insertions(+), 4 deletions(-) diff --git a/py/lora_manager.py b/py/lora_manager.py index 0c2f5e1d..3a372a02 100644 --- a/py/lora_manager.py +++ b/py/lora_manager.py @@ -198,18 +198,149 @@ class LoraManager: recipe_scanner = await ServiceRegistry.get_recipe_scanner() # Create low-priority initialization tasks - asyncio.create_task(lora_scanner.initialize_in_background(), name='lora_cache_init') - asyncio.create_task(checkpoint_scanner.initialize_in_background(), name='checkpoint_cache_init') - asyncio.create_task(embedding_scanner.initialize_in_background(), name='embedding_cache_init') - asyncio.create_task(recipe_scanner.initialize_in_background(), name='recipe_cache_init') + init_tasks = [ + asyncio.create_task(lora_scanner.initialize_in_background(), name='lora_cache_init'), + asyncio.create_task(checkpoint_scanner.initialize_in_background(), name='checkpoint_cache_init'), + asyncio.create_task(embedding_scanner.initialize_in_background(), name='embedding_cache_init'), + asyncio.create_task(recipe_scanner.initialize_in_background(), name='recipe_cache_init') + ] await ExampleImagesMigration.check_and_run_migrations() + # Schedule post-initialization tasks to run after scanners complete + asyncio.create_task( + cls._run_post_initialization_tasks(init_tasks), + name='post_init_tasks' + ) + logger.info("LoRA Manager: All services initialized and background tasks scheduled") except Exception as e: logger.error(f"LoRA Manager: Error initializing services: {e}", exc_info=True) + @classmethod + async def _run_post_initialization_tasks(cls, init_tasks): + """Run post-initialization tasks after all scanners complete""" + try: + logger.debug("LoRA Manager: Waiting for scanner initialization to complete...") + + # Wait for all scanner initialization tasks to complete + await asyncio.gather(*init_tasks, return_exceptions=True) + + logger.debug("LoRA Manager: Scanner initialization completed, starting post-initialization tasks...") + + # Run post-initialization tasks + post_tasks = [ + asyncio.create_task(cls._cleanup_backup_files(), name='cleanup_bak_files'), + # Add more post-initialization tasks here as needed + # asyncio.create_task(cls._another_post_task(), name='another_task'), + ] + + # Run all post-initialization tasks + results = await asyncio.gather(*post_tasks, return_exceptions=True) + + # Log results + for i, result in enumerate(results): + task_name = post_tasks[i].get_name() + if isinstance(result, Exception): + logger.error(f"Post-initialization task '{task_name}' failed: {result}") + else: + logger.debug(f"Post-initialization task '{task_name}' completed successfully") + + logger.debug("LoRA Manager: All post-initialization tasks completed") + + except Exception as e: + logger.error(f"LoRA Manager: Error in post-initialization tasks: {e}", exc_info=True) + + @classmethod + async def _cleanup_backup_files(cls): + """Clean up .bak files in all model roots""" + try: + logger.debug("Starting cleanup of .bak files in model directories...") + + # Collect all model roots + all_roots = set() + all_roots.update(config.loras_roots) + all_roots.update(config.base_models_roots) + all_roots.update(config.embeddings_roots) + + total_deleted = 0 + total_size_freed = 0 + + for root_path in all_roots: + if not os.path.exists(root_path): + continue + + try: + deleted_count, size_freed = await cls._cleanup_backup_files_in_directory(root_path) + total_deleted += deleted_count + total_size_freed += size_freed + + if deleted_count > 0: + logger.debug(f"Cleaned up {deleted_count} .bak files in {root_path} (freed {size_freed / (1024*1024):.2f} MB)") + + except Exception as e: + logger.error(f"Error cleaning up .bak files in {root_path}: {e}") + + # Yield control periodically + await asyncio.sleep(0.01) + + if total_deleted > 0: + logger.debug(f"Backup cleanup completed: removed {total_deleted} .bak files, freed {total_size_freed / (1024*1024):.2f} MB total") + else: + logger.debug("Backup cleanup completed: no .bak files found") + + except Exception as e: + logger.error(f"Error during backup file cleanup: {e}", exc_info=True) + + @classmethod + async def _cleanup_backup_files_in_directory(cls, directory_path: str): + """Clean up .bak files in a specific directory recursively + + Args: + directory_path: Path to the directory to clean + + Returns: + Tuple[int, int]: (number of files deleted, total size freed in bytes) + """ + deleted_count = 0 + size_freed = 0 + visited_paths = set() + + def cleanup_recursive(path): + nonlocal deleted_count, size_freed + + try: + real_path = os.path.realpath(path) + if real_path in visited_paths: + return + visited_paths.add(real_path) + + with os.scandir(path) as it: + for entry in it: + try: + if entry.is_file(follow_symlinks=True) and entry.name.endswith('.bak'): + file_size = entry.stat().st_size + os.remove(entry.path) + deleted_count += 1 + size_freed += file_size + logger.debug(f"Deleted .bak file: {entry.path}") + + elif entry.is_dir(follow_symlinks=True): + cleanup_recursive(entry.path) + + except Exception as e: + logger.warning(f"Could not delete .bak file {entry.path}: {e}") + + except Exception as e: + logger.error(f"Error scanning directory {path} for .bak files: {e}") + + # Run the recursive cleanup in a thread pool to avoid blocking + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, cleanup_recursive, directory_path) + + return deleted_count, size_freed + @classmethod async def _cleanup(cls, app): """Cleanup resources using ServiceRegistry"""