feat(downloads): add /downloads/queue/status endpoint and integrate queue lifecycle

- New GET /api/lm/downloads/queue/status handler for non-terminal status
  transitions (queued -> downloading, downloading -> paused, etc.)
- Queue lifecycle auto-integration in DownloadManager._download_with_semaphore:
  downloading -> SQLite update_status('downloading') on semaphore acquire
  completed -> complete_download('completed') on success
  canceled -> complete_download('canceled') on CancelledError
  failed -> complete_download('failed') on Exception
- All queue operations wrapped in try/except to never break the download flow
This commit is contained in:
Will Miao
2026-06-17 23:04:30 +08:00
parent 75298a402f
commit 8314b9bedb
3 changed files with 92 additions and 0 deletions

View File

@@ -1820,6 +1820,39 @@ class ModelDownloadHandler:
) )
return web.json_response({"success": False, "error": str(exc)}, status=500) return web.json_response({"success": False, "error": str(exc)}, status=500)
async def update_download_queue_status(self, request: web.Request) -> web.Response:
"""Update the status of a queue item (non-terminal transitions).
Supported transitions include ``queued → downloading``,
``downloading → paused``, ``paused → downloading``, etc.
Terminal transitions (``completed``, ``failed``, ``canceled``)
should use ``complete_download_in_queue`` instead.
"""
try:
download_id = request.query.get("download_id")
status = request.query.get("status")
if not download_id or not status:
return web.json_response(
{
"success": False,
"error": "download_id and status are required",
},
status=400,
)
service = await DownloadQueueService.get_instance()
updated = await service.update_status(download_id, status)
if not updated:
return web.json_response(
{"success": False, "error": "Download not found in queue"},
status=404,
)
return web.json_response({"success": True})
except Exception as exc:
self._logger.error(
"Error updating download queue status: %s", exc, exc_info=True
)
return web.json_response({"success": False, "error": str(exc)}, status=500)
class ModelCivitaiHandler: class ModelCivitaiHandler:
"""CivitAI integration endpoints.""" """CivitAI integration endpoints."""
@@ -2864,6 +2897,7 @@ class ModelHandlerSet:
"retry_all_failed_downloads": self.download.retry_all_failed_downloads, "retry_all_failed_downloads": self.download.retry_all_failed_downloads,
"complete_download_in_queue": self.download.complete_download_in_queue, "complete_download_in_queue": self.download.complete_download_in_queue,
"get_download_stats": self.download.get_download_stats, "get_download_stats": self.download.get_download_stats,
"update_download_queue_status": self.download.update_download_queue_status,
"get_civitai_versions": self.civitai.get_civitai_versions, "get_civitai_versions": self.civitai.get_civitai_versions,
"get_civitai_model_by_version": self.civitai.get_civitai_model_by_version, "get_civitai_model_by_version": self.civitai.get_civitai_model_by_version,
"get_civitai_model_by_hash": self.civitai.get_civitai_model_by_hash, "get_civitai_model_by_hash": self.civitai.get_civitai_model_by_hash,

View File

@@ -138,6 +138,9 @@ COMMON_ROUTE_DEFINITIONS: tuple[RouteDefinition, ...] = (
RouteDefinition( RouteDefinition(
"GET", "/api/lm/downloads/queue/complete", "complete_download_in_queue" "GET", "/api/lm/downloads/queue/complete", "complete_download_in_queue"
), ),
RouteDefinition(
"GET", "/api/lm/downloads/queue/status", "update_download_queue_status"
),
RouteDefinition("POST", "/api/lm/{prefix}/cancel-task", "cancel_task"), RouteDefinition("POST", "/api/lm/{prefix}/cancel-task", "cancel_task"),
RouteDefinition("GET", "/{prefix}", "handle_models_page"), RouteDefinition("GET", "/{prefix}", "handle_models_page"),
) )

View File

@@ -29,6 +29,7 @@ from .metadata_service import get_default_metadata_provider, get_metadata_provid
from .downloader import get_downloader, DownloadProgress, DownloadStreamControl from .downloader import get_downloader, DownloadProgress, DownloadStreamControl
from .aria2_downloader import Aria2Error, get_aria2_downloader from .aria2_downloader import Aria2Error, get_aria2_downloader
from .aria2_transfer_state import Aria2TransferStateStore from .aria2_transfer_state import Aria2TransferStateStore
from .download_queue_service import DownloadQueueService
# Download to temporary file first # Download to temporary file first
import tempfile import tempfile
@@ -360,6 +361,15 @@ class DownloadManager:
if self._active_downloads[task_id].get("transfer_backend") == "aria2": if self._active_downloads[task_id].get("transfer_backend") == "aria2":
await self._persist_aria2_state(task_id) await self._persist_aria2_state(task_id)
# Update SQLite queue status to 'downloading'
try:
queue_service = await DownloadQueueService.get_instance()
await queue_service.update_status(task_id, "downloading")
except Exception:
logger.warning(
"Failed to update queue status for %s", task_id, exc_info=True
)
# Use original download implementation # Use original download implementation
try: try:
# Check for cancellation before starting # Check for cancellation before starting
@@ -396,6 +406,22 @@ class DownloadManager:
if self._active_downloads[task_id].get("transfer_backend") == "aria2": if self._active_downloads[task_id].get("transfer_backend") == "aria2":
await self._persist_aria2_state(task_id) await self._persist_aria2_state(task_id)
# Move queue item to history on completion
try:
queue_service = await DownloadQueueService.get_instance()
await queue_service.complete_download(
download_id=task_id,
status=result.get("status", "completed") if result.get("success") else "failed",
error=result.get("error") if not result.get("success") else None,
file_path=result.get("file_path"),
bytes_downloaded=self._active_downloads.get(task_id, {}).get("bytes_downloaded", 0),
total_bytes=self._active_downloads.get(task_id, {}).get("total_bytes"),
)
except Exception:
logger.warning(
"Failed to complete queue item for %s", task_id, exc_info=True
)
return result return result
except asyncio.CancelledError: except asyncio.CancelledError:
# Handle cancellation # Handle cancellation
@@ -404,6 +430,19 @@ class DownloadManager:
self._active_downloads[task_id]["bytes_per_second"] = 0.0 self._active_downloads[task_id]["bytes_per_second"] = 0.0
if self._active_downloads[task_id].get("transfer_backend") == "aria2": if self._active_downloads[task_id].get("transfer_backend") == "aria2":
await self._persist_aria2_state(task_id) await self._persist_aria2_state(task_id)
# Move queue item to history as canceled
try:
queue_service = await DownloadQueueService.get_instance()
await queue_service.complete_download(
download_id=task_id,
status="canceled",
)
except Exception:
logger.warning(
"Failed to cancel queue item for %s", task_id, exc_info=True
)
logger.info(f"Download cancelled for task {task_id}") logger.info(f"Download cancelled for task {task_id}")
raise raise
except Exception as e: except Exception as e:
@@ -417,6 +456,22 @@ class DownloadManager:
self._active_downloads[task_id]["bytes_per_second"] = 0.0 self._active_downloads[task_id]["bytes_per_second"] = 0.0
if self._active_downloads[task_id].get("transfer_backend") == "aria2": if self._active_downloads[task_id].get("transfer_backend") == "aria2":
await self._persist_aria2_state(task_id) await self._persist_aria2_state(task_id)
# Move queue item to history as failed
try:
queue_service = await DownloadQueueService.get_instance()
await queue_service.complete_download(
download_id=task_id,
status="failed",
error=str(e),
bytes_downloaded=self._active_downloads.get(task_id, {}).get("bytes_downloaded", 0),
total_bytes=self._active_downloads.get(task_id, {}).get("total_bytes"),
)
except Exception:
logger.warning(
"Failed to complete queue item for %s", task_id, exc_info=True
)
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
finally: finally:
# Schedule cleanup of download record after delay # Schedule cleanup of download record after delay