feat: implement task cancellation for model scanning and bulk operations

This commit is contained in:
Will Miao
2026-01-02 18:48:28 +08:00
parent 953117efa1
commit 837c32c42f
24 changed files with 505 additions and 219 deletions

View File

@@ -36,11 +36,13 @@ class AutoOrganizeResult:
self.results_truncated: bool = False
self.sample_results: List[Dict[str, Any]] = []
self.is_flat_structure: bool = False
self.status: str = 'success'
def to_dict(self) -> Dict[str, Any]:
"""Convert result to dictionary"""
result = {
'success': True,
'success': self.status != 'error',
'status': self.status,
'message': f'Auto-organize {self.operation_type} completed: {self.success_count} moved, {self.skipped_count} skipped, {self.failure_count} failed out of {self.total} total',
'summary': {
'total': self.total,
@@ -98,6 +100,8 @@ class ModelFileService:
result = AutoOrganizeResult()
source_directories: Set[str] = set()
self.scanner.reset_cancellation()
try:
# Get all models from cache
cache = await self.scanner.get_cached_data()
@@ -186,6 +190,21 @@ class ModelFileService:
progress_callback,
source_directories # Pass the set to track source directories
)
if self.scanner.is_cancelled():
result.status = 'cancelled'
if progress_callback:
await progress_callback.on_progress({
'type': 'auto_organize_progress',
'status': 'cancelled',
'total': result.total,
'processed': result.processed,
'success': result.success_count,
'failures': result.failure_count,
'skipped': result.skipped_count,
'operation_type': result.operation_type
})
return result
# Send cleanup progress
if progress_callback:
@@ -246,9 +265,15 @@ class ModelFileService:
"""Process models in batches to avoid overwhelming the system"""
for i in range(0, result.total, AUTO_ORGANIZE_BATCH_SIZE):
if self.scanner.is_cancelled():
logger.info(f"{self.model_type.capitalize()} File Service: Auto-organize cancelled by user")
break
batch = all_models[i:i + AUTO_ORGANIZE_BATCH_SIZE]
for model in batch:
if self.scanner.is_cancelled():
break
await self._process_single_model(model, model_roots, result, source_directories)
result.processed += 1
@@ -535,8 +560,12 @@ class ModelMoveService:
"""
try:
results = []
self.scanner.reset_cancellation()
for file_path in file_paths:
if self.scanner.is_cancelled():
logger.info(f"{self.model_type.capitalize()} Move Service: Bulk move cancelled by user")
break
result = await self.move_model(file_path, target_path, use_default_paths=use_default_paths)
results.append({
"original_file_path": file_path,

View File

@@ -84,6 +84,7 @@ class ModelScanner:
self._excluded_models = [] # List to track excluded models
self._persistent_cache = get_persistent_cache()
self._name_display_mode = self._resolve_name_display_mode()
self._cancel_requested = False # Flag for cancellation
try:
loop = asyncio.get_running_loop()
except RuntimeError:
@@ -678,6 +679,7 @@ class ModelScanner:
async def _reconcile_cache(self) -> None:
"""Fast cache reconciliation - only process differences between cache and filesystem"""
self.reset_cancellation()
self._is_initializing = True # Set flag for reconciliation duration
try:
start_time = time.time()
@@ -737,6 +739,9 @@ class ModelScanner:
# Yield control periodically
await asyncio.sleep(0)
if self.is_cancelled():
logger.info(f"{self.model_type.capitalize()} Scanner: Reconcile scan cancelled")
return
# Process new files in batches
total_added = 0
@@ -784,6 +789,10 @@ class ModelScanner:
logger.error(f"Could not determine root path for {path}")
except Exception as e:
logger.error(f"Error adding {path} to cache: {e}")
if self.is_cancelled():
logger.info(f"{self.model_type.capitalize()} Scanner: Reconcile processing cancelled")
return
# Find missing files (in cache but not in filesystem)
missing_files = cached_paths - found_paths
@@ -838,6 +847,19 @@ class ModelScanner:
"""Check if the scanner is currently initializing"""
return self._is_initializing
def cancel_task(self) -> None:
"""Request cancellation of the current long-running task."""
self._cancel_requested = True
logger.info(f"{self.model_type.capitalize()} Scanner: Cancellation requested")
def reset_cancellation(self) -> None:
"""Reset the cancellation flag."""
self._cancel_requested = False
def is_cancelled(self) -> bool:
"""Check if cancellation has been requested."""
return self._cancel_requested
def get_model_roots(self) -> List[str]:
"""Get model root directories"""
raise NotImplementedError("Subclasses must implement get_model_roots")
@@ -1030,6 +1052,8 @@ class ModelScanner:
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Error reporting progress for {self.model_type}: {exc}")
self.reset_cancellation()
async def scan_recursive(current_path: str, root_path: str, visited_paths: Set[str]) -> None:
nonlocal processed_files
@@ -1073,6 +1097,8 @@ class ModelScanner:
await handle_progress()
await asyncio.sleep(0)
if self.is_cancelled():
return
elif entry.is_dir(follow_symlinks=True):
await scan_recursive(entry.path, root_path, visited_paths)
except Exception as entry_error:
@@ -1080,6 +1106,9 @@ class ModelScanner:
except Exception as scan_error:
logger.error(f"Error scanning {current_path}: {scan_error}")
if self.is_cancelled():
return
for model_root in self.get_model_roots():
if not os.path.exists(model_root):
continue
@@ -1448,6 +1477,10 @@ class ModelScanner:
deleted_models = []
for file_path in file_paths:
if self.is_cancelled():
logger.info(f"{self.model_type.capitalize()} Scanner: Bulk delete cancelled by user")
break
try:
target_dir = os.path.dirname(file_path)
base_name = os.path.basename(file_path)
@@ -1488,6 +1521,7 @@ class ModelScanner:
return {
'success': True,
'status': 'cancelled' if self.is_cancelled() else 'success',
'total_deleted': total_deleted,
'total_attempted': len(file_paths),
'cache_updated': cache_updated,

View File

@@ -466,6 +466,7 @@ class ModelUpdateService:
target_model_ids: Optional[Sequence[int]] = None,
) -> Dict[int, ModelUpdateRecord]:
"""Refresh update information for every model present in the cache."""
scanner.reset_cancellation()
normalized_targets = (
self._normalize_sequence(target_model_ids)
@@ -542,6 +543,9 @@ class ModelUpdateService:
force_refresh=force_refresh,
prefetched_response=prefetched.get(model_id),
)
if scanner.is_cancelled():
logger.info(f"{model_type.capitalize()} Update Service: Refresh cancelled by user")
return results
if record:
results[model_id] = record
if index % progress_interval == 0 or index == total_models:

View File

@@ -59,6 +59,8 @@ class BulkMetadataRefreshUseCase:
success = 0
needs_resort = False
self._service.scanner.reset_cancellation()
async def emit(status: str, **extra: Any) -> None:
if progress_callback is None:
return
@@ -69,6 +71,10 @@ class BulkMetadataRefreshUseCase:
await emit("started")
for model in to_process:
if self._service.scanner.is_cancelled():
self._logger.info("Bulk metadata refresh cancelled by user")
await emit("cancelled", processed=processed, success=success)
return {"success": False, "message": "Operation cancelled", "processed": processed, "updated": success, "total": total_models}
try:
original_name = model.get("model_name")
await MetadataManager.hydrate_model_data(model)