From 8314b9bedb67be3ef8d3caa8d0a56b4ef054f6ba Mon Sep 17 00:00:00 2001 From: Will Miao Date: Wed, 17 Jun 2026 23:04:30 +0800 Subject: [PATCH] feat(downloads): add /downloads/queue/status endpoint and integrate queue lifecycle - New GET /api/lm/downloads/queue/status handler for non-terminal status transitions (queued -> downloading, downloading -> paused, etc.) - Queue lifecycle auto-integration in DownloadManager._download_with_semaphore: downloading -> SQLite update_status('downloading') on semaphore acquire completed -> complete_download('completed') on success canceled -> complete_download('canceled') on CancelledError failed -> complete_download('failed') on Exception - All queue operations wrapped in try/except to never break the download flow --- py/routes/handlers/model_handlers.py | 34 +++++++++++++++++ py/routes/model_route_registrar.py | 3 ++ py/services/download_manager.py | 55 ++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+) diff --git a/py/routes/handlers/model_handlers.py b/py/routes/handlers/model_handlers.py index 6b0c9b4b..89c64c8e 100644 --- a/py/routes/handlers/model_handlers.py +++ b/py/routes/handlers/model_handlers.py @@ -1820,6 +1820,39 @@ class ModelDownloadHandler: ) return web.json_response({"success": False, "error": str(exc)}, status=500) + async def update_download_queue_status(self, request: web.Request) -> web.Response: + """Update the status of a queue item (non-terminal transitions). + + Supported transitions include ``queued → downloading``, + ``downloading → paused``, ``paused → downloading``, etc. + Terminal transitions (``completed``, ``failed``, ``canceled``) + should use ``complete_download_in_queue`` instead. + """ + try: + download_id = request.query.get("download_id") + status = request.query.get("status") + if not download_id or not status: + return web.json_response( + { + "success": False, + "error": "download_id and status are required", + }, + status=400, + ) + service = await DownloadQueueService.get_instance() + updated = await service.update_status(download_id, status) + if not updated: + return web.json_response( + {"success": False, "error": "Download not found in queue"}, + status=404, + ) + return web.json_response({"success": True}) + except Exception as exc: + self._logger.error( + "Error updating download queue status: %s", exc, exc_info=True + ) + return web.json_response({"success": False, "error": str(exc)}, status=500) + class ModelCivitaiHandler: """CivitAI integration endpoints.""" @@ -2864,6 +2897,7 @@ class ModelHandlerSet: "retry_all_failed_downloads": self.download.retry_all_failed_downloads, "complete_download_in_queue": self.download.complete_download_in_queue, "get_download_stats": self.download.get_download_stats, + "update_download_queue_status": self.download.update_download_queue_status, "get_civitai_versions": self.civitai.get_civitai_versions, "get_civitai_model_by_version": self.civitai.get_civitai_model_by_version, "get_civitai_model_by_hash": self.civitai.get_civitai_model_by_hash, diff --git a/py/routes/model_route_registrar.py b/py/routes/model_route_registrar.py index c7c98f46..9373f4f5 100644 --- a/py/routes/model_route_registrar.py +++ b/py/routes/model_route_registrar.py @@ -138,6 +138,9 @@ COMMON_ROUTE_DEFINITIONS: tuple[RouteDefinition, ...] = ( RouteDefinition( "GET", "/api/lm/downloads/queue/complete", "complete_download_in_queue" ), + RouteDefinition( + "GET", "/api/lm/downloads/queue/status", "update_download_queue_status" + ), RouteDefinition("POST", "/api/lm/{prefix}/cancel-task", "cancel_task"), RouteDefinition("GET", "/{prefix}", "handle_models_page"), ) diff --git a/py/services/download_manager.py b/py/services/download_manager.py index fe79121f..3e395610 100644 --- a/py/services/download_manager.py +++ b/py/services/download_manager.py @@ -29,6 +29,7 @@ from .metadata_service import get_default_metadata_provider, get_metadata_provid from .downloader import get_downloader, DownloadProgress, DownloadStreamControl from .aria2_downloader import Aria2Error, get_aria2_downloader from .aria2_transfer_state import Aria2TransferStateStore +from .download_queue_service import DownloadQueueService # Download to temporary file first import tempfile @@ -360,6 +361,15 @@ class DownloadManager: if self._active_downloads[task_id].get("transfer_backend") == "aria2": await self._persist_aria2_state(task_id) + # Update SQLite queue status to 'downloading' + try: + queue_service = await DownloadQueueService.get_instance() + await queue_service.update_status(task_id, "downloading") + except Exception: + logger.warning( + "Failed to update queue status for %s", task_id, exc_info=True + ) + # Use original download implementation try: # Check for cancellation before starting @@ -396,6 +406,22 @@ class DownloadManager: if self._active_downloads[task_id].get("transfer_backend") == "aria2": await self._persist_aria2_state(task_id) + # Move queue item to history on completion + try: + queue_service = await DownloadQueueService.get_instance() + await queue_service.complete_download( + download_id=task_id, + status=result.get("status", "completed") if result.get("success") else "failed", + error=result.get("error") if not result.get("success") else None, + file_path=result.get("file_path"), + bytes_downloaded=self._active_downloads.get(task_id, {}).get("bytes_downloaded", 0), + total_bytes=self._active_downloads.get(task_id, {}).get("total_bytes"), + ) + except Exception: + logger.warning( + "Failed to complete queue item for %s", task_id, exc_info=True + ) + return result except asyncio.CancelledError: # Handle cancellation @@ -404,6 +430,19 @@ class DownloadManager: self._active_downloads[task_id]["bytes_per_second"] = 0.0 if self._active_downloads[task_id].get("transfer_backend") == "aria2": await self._persist_aria2_state(task_id) + + # Move queue item to history as canceled + try: + queue_service = await DownloadQueueService.get_instance() + await queue_service.complete_download( + download_id=task_id, + status="canceled", + ) + except Exception: + logger.warning( + "Failed to cancel queue item for %s", task_id, exc_info=True + ) + logger.info(f"Download cancelled for task {task_id}") raise except Exception as e: @@ -417,6 +456,22 @@ class DownloadManager: self._active_downloads[task_id]["bytes_per_second"] = 0.0 if self._active_downloads[task_id].get("transfer_backend") == "aria2": await self._persist_aria2_state(task_id) + + # Move queue item to history as failed + try: + queue_service = await DownloadQueueService.get_instance() + await queue_service.complete_download( + download_id=task_id, + status="failed", + error=str(e), + bytes_downloaded=self._active_downloads.get(task_id, {}).get("bytes_downloaded", 0), + total_bytes=self._active_downloads.get(task_id, {}).get("total_bytes"), + ) + except Exception: + logger.warning( + "Failed to complete queue item for %s", task_id, exc_info=True + ) + return {"success": False, "error": str(e)} finally: # Schedule cleanup of download record after delay