feat: optimize model update checking with bulk operations

- Refactor update filter logic to use bulk update checks when available
- Add annotation method to attach update flags to response items
- Improve performance by reducing API calls for update status checks
- Maintain backward compatibility with fallback to individual checks
- Handle edge cases and logging for failed update status resolutions
This commit is contained in:
Will Miao
2025-10-25 15:31:33 +08:00
parent 58cafdb713
commit d9a6db3359
4 changed files with 314 additions and 75 deletions

View File

@@ -72,31 +72,35 @@ class BaseModelService(ABC):
if hash_filters:
filtered_data = await self._apply_hash_filters(sorted_data, hash_filters)
return self._paginate(filtered_data, page, page_size)
filtered_data = await self._apply_common_filters(
sorted_data,
folder=folder,
base_models=base_models,
tags=tags,
favorites_only=favorites_only,
search_options=search_options,
)
if search:
filtered_data = await self._apply_search_filters(
filtered_data,
search,
fuzzy_search,
search_options,
else:
filtered_data = await self._apply_common_filters(
sorted_data,
folder=folder,
base_models=base_models,
tags=tags,
favorites_only=favorites_only,
search_options=search_options,
)
filtered_data = await self._apply_specific_filters(filtered_data, **kwargs)
if search:
filtered_data = await self._apply_search_filters(
filtered_data,
search,
fuzzy_search,
search_options,
)
if has_update:
filtered_data = await self._apply_update_filter(filtered_data)
filtered_data = await self._apply_specific_filters(filtered_data, **kwargs)
return self._paginate(filtered_data, page, page_size)
if has_update:
filtered_data = await self._apply_update_filter(filtered_data)
paginated = self._paginate(filtered_data, page, page_size)
paginated['items'] = await self._annotate_update_flags(
paginated['items'],
assume_true=has_update,
)
return paginated
async def _apply_hash_filters(self, data: List[Dict], hash_filters: Dict) -> List[Dict]:
@@ -167,35 +171,141 @@ class BaseModelService(ABC):
)
return []
candidates: List[tuple[Dict, int]] = []
id_to_items: Dict[int, List[Dict]] = {}
ordered_ids: List[int] = []
for item in data:
model_id = self._extract_model_id(item)
if model_id is not None:
candidates.append((item, model_id))
if model_id is None:
continue
if model_id not in id_to_items:
id_to_items[model_id] = []
ordered_ids.append(model_id)
id_to_items[model_id].append(item)
if not candidates:
if not ordered_ids:
return []
tasks = [
self.update_service.has_update(self.model_type, model_id)
for _, model_id in candidates
]
results = await asyncio.gather(*tasks, return_exceptions=True)
resolved: Optional[Dict[int, bool]] = None
bulk_method = getattr(self.update_service, "has_updates_bulk", None)
if callable(bulk_method):
try:
resolved = await bulk_method(self.model_type, ordered_ids)
except Exception as exc:
logger.error(
"Failed to resolve update status in bulk for %s models (%s): %s",
self.model_type,
ordered_ids,
exc,
exc_info=True,
)
resolved = None
if resolved is None:
tasks = [
self.update_service.has_update(self.model_type, model_id)
for model_id in ordered_ids
]
results = await asyncio.gather(*tasks, return_exceptions=True)
resolved = {}
for model_id, result in zip(ordered_ids, results):
if isinstance(result, Exception):
logger.error(
"Failed to resolve update status for model %s (%s): %s",
model_id,
self.model_type,
result,
)
continue
resolved[model_id] = bool(result)
filtered: List[Dict] = []
for (item, model_id), result in zip(candidates, results):
if isinstance(result, Exception):
logger.error(
"Failed to resolve update status for model %s (%s): %s",
model_id,
self.model_type,
result,
)
continue
if result:
for item in data:
model_id = self._extract_model_id(item)
if model_id is not None and resolved.get(model_id, False):
filtered.append(item)
return filtered
async def _annotate_update_flags(
self,
items: List[Dict],
*,
assume_true: bool = False,
) -> List[Dict]:
"""Attach an update_available flag to each response item.
Items without a civitai model id default to False. When the caller already
filtered for updates we can skip the lookup and mark everything True.
"""
if not items:
return []
annotated = [dict(item) for item in items]
if assume_true:
for item in annotated:
item['update_available'] = True
return annotated
if self.update_service is None:
for item in annotated:
item['update_available'] = False
return annotated
id_to_items: Dict[int, List[Dict]] = {}
ordered_ids: List[int] = []
for item in annotated:
model_id = self._extract_model_id(item)
if model_id is None:
item['update_available'] = False
continue
if model_id not in id_to_items:
id_to_items[model_id] = []
ordered_ids.append(model_id)
id_to_items[model_id].append(item)
if not ordered_ids:
return annotated
resolved: Optional[Dict[int, bool]] = None
bulk_method = getattr(self.update_service, "has_updates_bulk", None)
if callable(bulk_method):
try:
resolved = await bulk_method(self.model_type, ordered_ids)
except Exception as exc:
logger.error(
"Failed to resolve update status in bulk for %s models (%s): %s",
self.model_type,
ordered_ids,
exc,
exc_info=True,
)
resolved = None
if resolved is None:
tasks = [
self.update_service.has_update(self.model_type, model_id)
for model_id in ordered_ids
]
results = await asyncio.gather(*tasks, return_exceptions=True)
resolved = {}
for model_id, result in zip(ordered_ids, results):
if isinstance(result, Exception):
logger.error(
"Failed to resolve update status for model %s (%s): %s",
model_id,
self.model_type,
result,
)
continue
resolved[model_id] = bool(result)
for model_id, items_for_id in id_to_items.items():
flag = bool(resolved.get(model_id, False))
for item in items_for_id:
item['update_available'] = flag
return annotated
@staticmethod
def _extract_model_id(item: Dict) -> Optional[int]:
civitai = item.get('civitai') if isinstance(item, dict) else None