perf(check-model-exists): eliminate SQLite connection-per-query overhead and skip redundant history checks

Root cause: 231 concurrent /check-model-exists requests on 175K-lora library
caused ~9.4s wall clock time. The bottleneck was two-fold:

1. DownloadedVersionHistoryService opened a new sqlite3.connect() for every
   query under asyncio.Lock. With a large WAL from 175K entries, each
   connect() took ~8ms. Serialized by the lock across 231 requests, the
   230th request waited ~1848ms just for lock acquisition.

2. check_model_exists always queried download history even when the model
   was found locally. The history result (hasBeenDownloaded /
   downloadedVersionIds) is only used by the UI when the model is NOT
   found locally; when found, the 'in library' indicator takes priority.

Changes:
- downloaded_version_history_service.py: added persistent _get_conn() that
  creates the SQLite connection once and reuses it across all queries
- misc_handlers.py: early-return from check_model_exists when the model
  exists locally, bypassing the history service entirely (lock skipped)

Expected: per-request wait time drops from ~1912ms to <3ms, wall clock
from ~9.4s to <0.3s for the 175K-lora user's 231-card page.
This commit is contained in:
Will Miao
2026-05-02 13:31:20 +08:00
parent 502b7eab31
commit d324b57274
2 changed files with 159 additions and 142 deletions

View File

@@ -1791,29 +1791,33 @@ class ModelLibraryHandler:
exists = True exists = True
model_type = "embedding" model_type = "embedding"
if exists:
return web.json_response(
{
"success": True,
"exists": True,
"modelType": model_type,
"hasBeenDownloaded": False,
}
)
history_service = await self._get_download_history_service() history_service = await self._get_download_history_service()
has_been_downloaded = False has_been_downloaded = False
history_type = model_type history_type = None
if history_type: for candidate_type in ("lora", "checkpoint", "embedding"):
has_been_downloaded = await history_service.has_been_downloaded( if await history_service.has_been_downloaded(
history_type, candidate_type,
model_version_id, model_version_id,
) ):
else: has_been_downloaded = True
for candidate_type in ("lora", "checkpoint", "embedding"): history_type = candidate_type
if await history_service.has_been_downloaded( break
candidate_type,
model_version_id,
):
has_been_downloaded = True
history_type = candidate_type
break
return web.json_response( return web.json_response(
{ {
"success": True, "success": True,
"exists": exists, "exists": False,
"modelType": model_type if exists else history_type, "modelType": history_type,
"hasBeenDownloaded": has_been_downloaded, "hasBeenDownloaded": has_been_downloaded,
} }
) )
@@ -1833,40 +1837,46 @@ class ModelLibraryHandler:
model_type = None model_type = None
versions = [] versions = []
downloaded_version_ids = [] downloaded_version_ids = []
history_service = await self._get_download_history_service()
if lora_versions: if lora_versions:
model_type = "lora" return web.json_response(
versions = self._with_downloaded_flag(lora_versions) {
downloaded_version_ids = await history_service.get_downloaded_version_ids( "success": True,
model_type, "modelType": "lora",
model_id, "versions": self._with_downloaded_flag(lora_versions),
"downloadedVersionIds": [],
}
) )
elif checkpoint_versions: if checkpoint_versions:
model_type = "checkpoint" return web.json_response(
versions = self._with_downloaded_flag(checkpoint_versions) {
downloaded_version_ids = await history_service.get_downloaded_version_ids( "success": True,
model_type, "modelType": "checkpoint",
model_id, "versions": self._with_downloaded_flag(checkpoint_versions),
"downloadedVersionIds": [],
}
) )
elif embedding_versions: if embedding_versions:
model_type = "embedding" return web.json_response(
versions = self._with_downloaded_flag(embedding_versions) {
downloaded_version_ids = await history_service.get_downloaded_version_ids( "success": True,
model_type, "modelType": "embedding",
model_id, "versions": self._with_downloaded_flag(embedding_versions),
"downloadedVersionIds": [],
}
) )
else:
for candidate_type in ("lora", "checkpoint", "embedding"): history_service = await self._get_download_history_service()
candidate_downloaded_version_ids = ( for candidate_type in ("lora", "checkpoint", "embedding"):
await history_service.get_downloaded_version_ids( candidate_downloaded_version_ids = (
candidate_type, await history_service.get_downloaded_version_ids(
model_id, candidate_type,
) model_id,
) )
if candidate_downloaded_version_ids: )
model_type = candidate_type if candidate_downloaded_version_ids:
downloaded_version_ids = candidate_downloaded_version_ids model_type = candidate_type
break downloaded_version_ids = candidate_downloaded_version_ids
break
return web.json_response( return web.json_response(
{ {

View File

@@ -64,6 +64,7 @@ class DownloadedVersionHistoryService:
self._db_path = db_path or _resolve_database_path() self._db_path = db_path or _resolve_database_path()
self._settings = settings_manager or get_settings_manager() self._settings = settings_manager or get_settings_manager()
self._lock = asyncio.Lock() self._lock = asyncio.Lock()
self._conn: sqlite3.Connection | None = None
self._schema_initialized = False self._schema_initialized = False
self._ensure_directory() self._ensure_directory()
self._initialize_schema() self._initialize_schema()
@@ -78,6 +79,12 @@ class DownloadedVersionHistoryService:
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
return conn return conn
def _get_conn(self) -> sqlite3.Connection:
if self._conn is None:
self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
return self._conn
def _initialize_schema(self) -> None: def _initialize_schema(self) -> None:
if self._schema_initialized: if self._schema_initialized:
return return
@@ -116,33 +123,33 @@ class DownloadedVersionHistoryService:
timestamp = time.time() timestamp = time.time()
async with self._lock: async with self._lock:
with self._connect() as conn: conn = self._get_conn()
conn.execute( conn.execute(
""" """
INSERT INTO downloaded_model_versions ( INSERT INTO downloaded_model_versions (
model_type, version_id, model_id, first_seen_at, last_seen_at, model_type, version_id, model_id, first_seen_at, last_seen_at,
source, last_file_path, last_library_name, is_deleted_override source, last_file_path, last_library_name, is_deleted_override
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0) ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0)
ON CONFLICT(model_type, version_id) DO UPDATE SET ON CONFLICT(model_type, version_id) DO UPDATE SET
model_id = COALESCE(excluded.model_id, downloaded_model_versions.model_id), model_id = COALESCE(excluded.model_id, downloaded_model_versions.model_id),
last_seen_at = excluded.last_seen_at, last_seen_at = excluded.last_seen_at,
source = excluded.source, source = excluded.source,
last_file_path = COALESCE(excluded.last_file_path, downloaded_model_versions.last_file_path), last_file_path = COALESCE(excluded.last_file_path, downloaded_model_versions.last_file_path),
last_library_name = COALESCE(excluded.last_library_name, downloaded_model_versions.last_library_name), last_library_name = COALESCE(excluded.last_library_name, downloaded_model_versions.last_library_name),
is_deleted_override = 0 is_deleted_override = 0
""", """,
( (
normalized_type, normalized_type,
normalized_version_id, normalized_version_id,
normalized_model_id, normalized_model_id,
timestamp, timestamp,
timestamp, timestamp,
source, source,
file_path, file_path,
active_library_name, active_library_name,
), ),
) )
conn.commit() conn.commit()
async def mark_downloaded_bulk( async def mark_downloaded_bulk(
self, self,
@@ -180,24 +187,24 @@ class DownloadedVersionHistoryService:
return return
async with self._lock: async with self._lock:
with self._connect() as conn: conn = self._get_conn()
conn.executemany( conn.executemany(
""" """
INSERT INTO downloaded_model_versions ( INSERT INTO downloaded_model_versions (
model_type, version_id, model_id, first_seen_at, last_seen_at, model_type, version_id, model_id, first_seen_at, last_seen_at,
source, last_file_path, last_library_name, is_deleted_override source, last_file_path, last_library_name, is_deleted_override
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0) ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0)
ON CONFLICT(model_type, version_id) DO UPDATE SET ON CONFLICT(model_type, version_id) DO UPDATE SET
model_id = COALESCE(excluded.model_id, downloaded_model_versions.model_id), model_id = COALESCE(excluded.model_id, downloaded_model_versions.model_id),
last_seen_at = excluded.last_seen_at, last_seen_at = excluded.last_seen_at,
source = excluded.source, source = excluded.source,
last_file_path = COALESCE(excluded.last_file_path, downloaded_model_versions.last_file_path), last_file_path = COALESCE(excluded.last_file_path, downloaded_model_versions.last_file_path),
last_library_name = COALESCE(excluded.last_library_name, downloaded_model_versions.last_library_name), last_library_name = COALESCE(excluded.last_library_name, downloaded_model_versions.last_library_name),
is_deleted_override = 0 is_deleted_override = 0
""", """,
payload, payload,
) )
conn.commit() conn.commit()
async def mark_not_downloaded(self, model_type: str, version_id: int) -> None: async def mark_not_downloaded(self, model_type: str, version_id: int) -> None:
normalized_type = _normalize_model_type(model_type) normalized_type = _normalize_model_type(model_type)
@@ -208,28 +215,28 @@ class DownloadedVersionHistoryService:
timestamp = time.time() timestamp = time.time()
async with self._lock: async with self._lock:
with self._connect() as conn: conn = self._get_conn()
conn.execute( conn.execute(
""" """
INSERT INTO downloaded_model_versions ( INSERT INTO downloaded_model_versions (
model_type, version_id, model_id, first_seen_at, last_seen_at, model_type, version_id, model_id, first_seen_at, last_seen_at,
source, last_file_path, last_library_name, is_deleted_override source, last_file_path, last_library_name, is_deleted_override
) VALUES (?, ?, NULL, ?, ?, 'manual', NULL, ?, 1) ) VALUES (?, ?, NULL, ?, ?, 'manual', NULL, ?, 1)
ON CONFLICT(model_type, version_id) DO UPDATE SET ON CONFLICT(model_type, version_id) DO UPDATE SET
last_seen_at = excluded.last_seen_at, last_seen_at = excluded.last_seen_at,
source = excluded.source, source = excluded.source,
last_library_name = COALESCE(excluded.last_library_name, downloaded_model_versions.last_library_name), last_library_name = COALESCE(excluded.last_library_name, downloaded_model_versions.last_library_name),
is_deleted_override = 1 is_deleted_override = 1
""", """,
( (
normalized_type, normalized_type,
normalized_version_id, normalized_version_id,
timestamp, timestamp,
timestamp, timestamp,
self._get_active_library_name(), self._get_active_library_name(),
), ),
) )
conn.commit() conn.commit()
async def has_been_downloaded(self, model_type: str, version_id: int) -> bool: async def has_been_downloaded(self, model_type: str, version_id: int) -> bool:
normalized_type = _normalize_model_type(model_type) normalized_type = _normalize_model_type(model_type)
@@ -238,15 +245,15 @@ class DownloadedVersionHistoryService:
return False return False
async with self._lock: async with self._lock:
with self._connect() as conn: conn = self._get_conn()
row = conn.execute( row = conn.execute(
""" """
SELECT is_deleted_override SELECT is_deleted_override
FROM downloaded_model_versions FROM downloaded_model_versions
WHERE model_type = ? AND version_id = ? WHERE model_type = ? AND version_id = ?
""", """,
(normalized_type, normalized_version_id), (normalized_type, normalized_version_id),
).fetchone() ).fetchone()
return bool(row) and not bool(row["is_deleted_override"]) return bool(row) and not bool(row["is_deleted_override"])
async def get_downloaded_version_ids( async def get_downloaded_version_ids(
@@ -258,16 +265,16 @@ class DownloadedVersionHistoryService:
return [] return []
async with self._lock: async with self._lock:
with self._connect() as conn: conn = self._get_conn()
rows = conn.execute( rows = conn.execute(
""" """
SELECT version_id SELECT version_id
FROM downloaded_model_versions FROM downloaded_model_versions
WHERE model_type = ? AND model_id = ? AND is_deleted_override = 0 WHERE model_type = ? AND model_id = ? AND is_deleted_override = 0
ORDER BY version_id ASC ORDER BY version_id ASC
""", """,
(normalized_type, normalized_model_id), (normalized_type, normalized_model_id),
).fetchall() ).fetchall()
return [int(row["version_id"]) for row in rows] return [int(row["version_id"]) for row in rows]
async def get_downloaded_version_ids_bulk( async def get_downloaded_version_ids_bulk(
@@ -291,17 +298,17 @@ class DownloadedVersionHistoryService:
params: list[object] = [normalized_type, *normalized_model_ids] params: list[object] = [normalized_type, *normalized_model_ids]
async with self._lock: async with self._lock:
with self._connect() as conn: conn = self._get_conn()
rows = conn.execute( rows = conn.execute(
f""" f"""
SELECT model_id, version_id SELECT model_id, version_id
FROM downloaded_model_versions FROM downloaded_model_versions
WHERE model_type = ? WHERE model_type = ?
AND model_id IN ({placeholders}) AND model_id IN ({placeholders})
AND is_deleted_override = 0 AND is_deleted_override = 0
""", """,
params, params,
).fetchall() ).fetchall()
result: dict[int, set[int]] = {} result: dict[int, set[int]] = {}
for row in rows: for row in rows: