mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-06-14 14:39:25 -03:00
fix(downloads): delete history entries on retry and add dedup for bug #980
- retry_from_history() and retry_all_failed() now DELETE the original
history entry after re-queuing it. Previously the old entry stayed
in history causing exponential growth on repeated retry→cancel→retry
cycles.
- Add deduplicate() called once on singleton creation to clean up
existing duplicate queue/history entries left by the bug:
1. In-status dedup (keep highest id per model+version+status)
2. Cross-status dedup (prefer completed > failed > canceled)
3. Queue dedup (keep highest rowid per model+version)
4. Orphan queue cleanup (source='retry' entries obsoleted by
terminal history entries)
This commit is contained in:
@@ -82,6 +82,7 @@ class DownloadQueueService:
|
|||||||
async with cls._class_lock:
|
async with cls._class_lock:
|
||||||
if cls._instance is None:
|
if cls._instance is None:
|
||||||
cls._instance = cls()
|
cls._instance = cls()
|
||||||
|
await cls._instance.deduplicate()
|
||||||
return cls._instance
|
return cls._instance
|
||||||
|
|
||||||
def __init__(self, db_path: Optional[str] = None) -> None:
|
def __init__(self, db_path: Optional[str] = None) -> None:
|
||||||
@@ -608,7 +609,9 @@ class DownloadQueueService:
|
|||||||
|
|
||||||
Looks up the history record by its primary key. If the status is
|
Looks up the history record by its primary key. If the status is
|
||||||
``failed`` or ``canceled`` a new queue entry is created with the
|
``failed`` or ``canceled`` a new queue entry is created with the
|
||||||
same model metadata and a fresh download id.
|
same model metadata and a fresh download id, and the original
|
||||||
|
history entry is **deleted** to prevent exponential growth when
|
||||||
|
the retried item is later canceled or fails again and re-retried.
|
||||||
"""
|
"""
|
||||||
async with self._lock:
|
async with self._lock:
|
||||||
conn = self._get_conn()
|
conn = self._get_conn()
|
||||||
@@ -645,6 +648,10 @@ class DownloadQueueService:
|
|||||||
now,
|
now,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
conn.execute(
|
||||||
|
"DELETE FROM download_history WHERE id = ?",
|
||||||
|
(item_id,),
|
||||||
|
)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
queued = conn.execute(
|
queued = conn.execute(
|
||||||
"SELECT * FROM download_queue WHERE download_id = ?",
|
"SELECT * FROM download_queue WHERE download_id = ?",
|
||||||
@@ -656,6 +663,9 @@ class DownloadQueueService:
|
|||||||
async def retry_all_failed(self) -> int:
|
async def retry_all_failed(self) -> int:
|
||||||
"""Re-queue all failed and canceled downloads from history.
|
"""Re-queue all failed and canceled downloads from history.
|
||||||
|
|
||||||
|
Each history entry is **deleted** after being re-queued so that
|
||||||
|
repeated retry-all calls do not cause exponential growth.
|
||||||
|
|
||||||
Returns the number of items that were re-queued.
|
Returns the number of items that were re-queued.
|
||||||
"""
|
"""
|
||||||
async with self._lock:
|
async with self._lock:
|
||||||
@@ -691,6 +701,10 @@ class DownloadQueueService:
|
|||||||
now,
|
now,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
conn.execute(
|
||||||
|
"DELETE FROM download_history WHERE id = ?",
|
||||||
|
(row["id"],),
|
||||||
|
)
|
||||||
count += 1
|
count += 1
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
@@ -732,3 +746,126 @@ class DownloadQueueService:
|
|||||||
"failed": history_stats.get("failed", 0),
|
"failed": history_stats.get("failed", 0),
|
||||||
"canceled": history_stats.get("canceled", 0),
|
"canceled": history_stats.get("canceled", 0),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Deduplication (one-time cleanup for bug #980)
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def deduplicate(self) -> dict[str, int]:
|
||||||
|
"""Remove duplicate entries caused by the retry-amplification bug.
|
||||||
|
|
||||||
|
The bug (issue #980) caused the same download to appear N times in
|
||||||
|
both the queue and history tables when ``retry_all_failed`` was
|
||||||
|
called repeatedly without deleting the original history rows.
|
||||||
|
|
||||||
|
This method is called **once** when the singleton is first created.
|
||||||
|
It is idempotent — after the first run there will be no duplicates
|
||||||
|
to remove, so subsequent calls are a no-op.
|
||||||
|
|
||||||
|
Returns a dict with the count of removed rows per table.
|
||||||
|
"""
|
||||||
|
result: dict[str, int] = {
|
||||||
|
"removed_history": 0,
|
||||||
|
"removed_queue": 0,
|
||||||
|
"removed_orphan_queue": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
async with self._lock:
|
||||||
|
conn = self._get_conn()
|
||||||
|
|
||||||
|
# 1. History: for each (model_id, model_version_id, status) triplet
|
||||||
|
# keep only the row with the highest id (most recently inserted).
|
||||||
|
conn.execute("""
|
||||||
|
DELETE FROM download_history
|
||||||
|
WHERE id NOT IN (
|
||||||
|
SELECT MAX(id)
|
||||||
|
FROM download_history
|
||||||
|
GROUP BY model_id, model_version_id, status
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
result["removed_history"] = conn.execute(
|
||||||
|
"SELECT changes()"
|
||||||
|
).fetchone()[0]
|
||||||
|
|
||||||
|
# 2. Cross-status dedup: for each (model_id, model_version_id),
|
||||||
|
# keep only the entry with the highest-priority terminal status.
|
||||||
|
# Priority: completed (3) > failed (2) > canceled (1).
|
||||||
|
# This prevents the same model version from having both a
|
||||||
|
# 'failed' and a 'canceled' entry (or a 'completed' alongside
|
||||||
|
# either) after the bug-created duplicates are removed.
|
||||||
|
conn.execute("""
|
||||||
|
DELETE FROM download_history
|
||||||
|
WHERE id NOT IN (
|
||||||
|
SELECT dh.id
|
||||||
|
FROM download_history dh
|
||||||
|
INNER JOIN (
|
||||||
|
SELECT model_id, model_version_id,
|
||||||
|
MAX(CASE status
|
||||||
|
WHEN 'completed' THEN 3
|
||||||
|
WHEN 'failed' THEN 2
|
||||||
|
WHEN 'canceled' THEN 1
|
||||||
|
ELSE 0
|
||||||
|
END) AS best_prio
|
||||||
|
FROM download_history
|
||||||
|
GROUP BY model_id, model_version_id
|
||||||
|
) best
|
||||||
|
ON dh.model_id = best.model_id
|
||||||
|
AND dh.model_version_id = best.model_version_id
|
||||||
|
AND CASE dh.status
|
||||||
|
WHEN 'completed' THEN 3
|
||||||
|
WHEN 'failed' THEN 2
|
||||||
|
WHEN 'canceled' THEN 1
|
||||||
|
ELSE 0
|
||||||
|
END = best.best_prio
|
||||||
|
GROUP BY dh.model_id, dh.model_version_id
|
||||||
|
HAVING dh.id = MAX(dh.id)
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
result["removed_history"] += conn.execute(
|
||||||
|
"SELECT changes()"
|
||||||
|
).fetchone()[0]
|
||||||
|
|
||||||
|
# 3. Queue: for each (model_id, model_version_id) keep only the
|
||||||
|
# row with the latest added_at (most recently enqueued).
|
||||||
|
conn.execute("""
|
||||||
|
DELETE FROM download_queue
|
||||||
|
WHERE rowid NOT IN (
|
||||||
|
SELECT MAX(rowid)
|
||||||
|
FROM download_queue
|
||||||
|
WHERE status IN ('queued', 'downloading', 'paused', 'waiting')
|
||||||
|
GROUP BY model_id, model_version_id
|
||||||
|
)
|
||||||
|
AND status IN ('queued', 'downloading', 'paused', 'waiting')
|
||||||
|
""")
|
||||||
|
result["removed_queue"] = conn.execute(
|
||||||
|
"SELECT changes()"
|
||||||
|
).fetchone()[0]
|
||||||
|
|
||||||
|
# 4. Remove orphaned queue entries — items that were re-queued
|
||||||
|
# (source='retry') but whose model version already has a
|
||||||
|
# terminal history entry. These are artifacts of the buggy
|
||||||
|
# retry cycle that were never cleaned up.
|
||||||
|
conn.execute("""
|
||||||
|
DELETE FROM download_queue
|
||||||
|
WHERE source = 'retry'
|
||||||
|
AND (model_id, model_version_id) IN (
|
||||||
|
SELECT model_id, model_version_id
|
||||||
|
FROM download_history
|
||||||
|
WHERE status IN ('failed', 'canceled')
|
||||||
|
)
|
||||||
|
AND status IN ('queued', 'waiting')
|
||||||
|
""")
|
||||||
|
result["removed_orphan_queue"] = conn.execute(
|
||||||
|
"SELECT changes()"
|
||||||
|
).fetchone()[0]
|
||||||
|
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Deduplicate: removed %s history rows, %s queue rows, "
|
||||||
|
"%s orphaned queue rows",
|
||||||
|
result["removed_history"],
|
||||||
|
result["removed_queue"],
|
||||||
|
result["removed_orphan_queue"],
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|||||||
Reference in New Issue
Block a user