diff --git a/py/services/download_queue_service.py b/py/services/download_queue_service.py index 1aaf70bc..7996078f 100644 --- a/py/services/download_queue_service.py +++ b/py/services/download_queue_service.py @@ -82,6 +82,7 @@ class DownloadQueueService: async with cls._class_lock: if cls._instance is None: cls._instance = cls() + await cls._instance.deduplicate() return cls._instance def __init__(self, db_path: Optional[str] = None) -> None: @@ -608,7 +609,9 @@ class DownloadQueueService: Looks up the history record by its primary key. If the status is ``failed`` or ``canceled`` a new queue entry is created with the - same model metadata and a fresh download id. + same model metadata and a fresh download id, and the original + history entry is **deleted** to prevent exponential growth when + the retried item is later canceled or fails again and re-retried. """ async with self._lock: conn = self._get_conn() @@ -645,6 +648,10 @@ class DownloadQueueService: now, ), ) + conn.execute( + "DELETE FROM download_history WHERE id = ?", + (item_id,), + ) conn.commit() queued = conn.execute( "SELECT * FROM download_queue WHERE download_id = ?", @@ -656,6 +663,9 @@ class DownloadQueueService: async def retry_all_failed(self) -> int: """Re-queue all failed and canceled downloads from history. + Each history entry is **deleted** after being re-queued so that + repeated retry-all calls do not cause exponential growth. + Returns the number of items that were re-queued. """ async with self._lock: @@ -691,6 +701,10 @@ class DownloadQueueService: now, ), ) + conn.execute( + "DELETE FROM download_history WHERE id = ?", + (row["id"],), + ) count += 1 conn.commit() @@ -732,3 +746,126 @@ class DownloadQueueService: "failed": history_stats.get("failed", 0), "canceled": history_stats.get("canceled", 0), } + + # ------------------------------------------------------------------ + # Deduplication (one-time cleanup for bug #980) + # ------------------------------------------------------------------ + + async def deduplicate(self) -> dict[str, int]: + """Remove duplicate entries caused by the retry-amplification bug. + + The bug (issue #980) caused the same download to appear N times in + both the queue and history tables when ``retry_all_failed`` was + called repeatedly without deleting the original history rows. + + This method is called **once** when the singleton is first created. + It is idempotent — after the first run there will be no duplicates + to remove, so subsequent calls are a no-op. + + Returns a dict with the count of removed rows per table. + """ + result: dict[str, int] = { + "removed_history": 0, + "removed_queue": 0, + "removed_orphan_queue": 0, + } + + async with self._lock: + conn = self._get_conn() + + # 1. History: for each (model_id, model_version_id, status) triplet + # keep only the row with the highest id (most recently inserted). + conn.execute(""" + DELETE FROM download_history + WHERE id NOT IN ( + SELECT MAX(id) + FROM download_history + GROUP BY model_id, model_version_id, status + ) + """) + result["removed_history"] = conn.execute( + "SELECT changes()" + ).fetchone()[0] + + # 2. Cross-status dedup: for each (model_id, model_version_id), + # keep only the entry with the highest-priority terminal status. + # Priority: completed (3) > failed (2) > canceled (1). + # This prevents the same model version from having both a + # 'failed' and a 'canceled' entry (or a 'completed' alongside + # either) after the bug-created duplicates are removed. + conn.execute(""" + DELETE FROM download_history + WHERE id NOT IN ( + SELECT dh.id + FROM download_history dh + INNER JOIN ( + SELECT model_id, model_version_id, + MAX(CASE status + WHEN 'completed' THEN 3 + WHEN 'failed' THEN 2 + WHEN 'canceled' THEN 1 + ELSE 0 + END) AS best_prio + FROM download_history + GROUP BY model_id, model_version_id + ) best + ON dh.model_id = best.model_id + AND dh.model_version_id = best.model_version_id + AND CASE dh.status + WHEN 'completed' THEN 3 + WHEN 'failed' THEN 2 + WHEN 'canceled' THEN 1 + ELSE 0 + END = best.best_prio + GROUP BY dh.model_id, dh.model_version_id + HAVING dh.id = MAX(dh.id) + ) + """) + result["removed_history"] += conn.execute( + "SELECT changes()" + ).fetchone()[0] + + # 3. Queue: for each (model_id, model_version_id) keep only the + # row with the latest added_at (most recently enqueued). + conn.execute(""" + DELETE FROM download_queue + WHERE rowid NOT IN ( + SELECT MAX(rowid) + FROM download_queue + WHERE status IN ('queued', 'downloading', 'paused', 'waiting') + GROUP BY model_id, model_version_id + ) + AND status IN ('queued', 'downloading', 'paused', 'waiting') + """) + result["removed_queue"] = conn.execute( + "SELECT changes()" + ).fetchone()[0] + + # 4. Remove orphaned queue entries — items that were re-queued + # (source='retry') but whose model version already has a + # terminal history entry. These are artifacts of the buggy + # retry cycle that were never cleaned up. + conn.execute(""" + DELETE FROM download_queue + WHERE source = 'retry' + AND (model_id, model_version_id) IN ( + SELECT model_id, model_version_id + FROM download_history + WHERE status IN ('failed', 'canceled') + ) + AND status IN ('queued', 'waiting') + """) + result["removed_orphan_queue"] = conn.execute( + "SELECT changes()" + ).fetchone()[0] + + conn.commit() + + logger.info( + "Deduplicate: removed %s history rows, %s queue rows, " + "%s orphaned queue rows", + result["removed_history"], + result["removed_queue"], + result["removed_orphan_queue"], + ) + return result