Add bulk delete functionality for loras and implement model duplicates management. See #198

- Introduced a new API endpoint for bulk deleting loras.
- Added ModelDuplicatesManager to handle duplicate models for loras and checkpoints.
- Implemented UI components for displaying duplicates and managing selections.
- Enhanced controls with a button for finding duplicates.
- Updated templates to include a duplicates banner and associated actions.
This commit is contained in:
Will Miao
2025-05-31 21:58:48 +08:00
parent 0bd62eef3a
commit 26f9779fbf
21 changed files with 935 additions and 60 deletions

View File

@@ -2,7 +2,7 @@ from typing import Dict, Optional, Set, List
import os
class ModelHashIndex:
"""Index for looking up models by hash or path"""
"""Index for looking up models by hash or filename"""
def __init__(self):
self._hash_to_path: Dict[str, str] = {}
@@ -66,36 +66,123 @@ class ModelHashIndex:
def remove_by_path(self, file_path: str) -> None:
"""Remove entry by file path"""
filename = self._get_filename_from_path(file_path)
if filename in self._filename_to_hash:
hash_val = self._filename_to_hash[filename]
if hash_val in self._hash_to_path:
del self._hash_to_path[hash_val]
del self._filename_to_hash[filename]
hash_val = None
# Find the hash for this file path
for h, p in self._hash_to_path.items():
if p == file_path:
hash_val = h
break
# If we didn't find a hash, nothing to do
if not hash_val:
return
# Update duplicates tracking for hash
if hash_val in self._duplicate_hashes:
# Remove the current path from duplicates
self._duplicate_hashes[hash_val] = [p for p in self._duplicate_hashes[hash_val] if p != file_path]
# Also clean up from duplicates tracking
if filename in self._duplicate_filenames:
self._duplicate_filenames[filename] = [p for p in self._duplicate_filenames[filename] if p != file_path]
if not self._duplicate_filenames[filename]:
del self._duplicate_filenames[filename]
if hash_val in self._duplicate_hashes:
self._duplicate_hashes[hash_val] = [p for p in self._duplicate_hashes[hash_val] if p != file_path]
if not self._duplicate_hashes[hash_val]:
# Update or remove hash mapping based on remaining duplicates
if len(self._duplicate_hashes[hash_val]) > 0:
# Replace with one of the remaining paths
new_path = self._duplicate_hashes[hash_val][0]
new_filename = self._get_filename_from_path(new_path)
# Update hash-to-path mapping
self._hash_to_path[hash_val] = new_path
# IMPORTANT: Update filename-to-hash mapping for consistency
# Remove old filename mapping if it points to this hash
if filename in self._filename_to_hash and self._filename_to_hash[filename] == hash_val:
del self._filename_to_hash[filename]
# Add new filename mapping
self._filename_to_hash[new_filename] = hash_val
# If only one duplicate left, remove from duplicates tracking
if len(self._duplicate_hashes[hash_val]) == 1:
del self._duplicate_hashes[hash_val]
else:
# No duplicates left, remove hash entry completely
del self._duplicate_hashes[hash_val]
del self._hash_to_path[hash_val]
# Remove corresponding filename entry if it points to this hash
if filename in self._filename_to_hash and self._filename_to_hash[filename] == hash_val:
del self._filename_to_hash[filename]
else:
# No duplicates, simply remove the hash entry
del self._hash_to_path[hash_val]
# Remove corresponding filename entry if it points to this hash
if filename in self._filename_to_hash and self._filename_to_hash[filename] == hash_val:
del self._filename_to_hash[filename]
# Update duplicates tracking for filename
if filename in self._duplicate_filenames:
# Remove the current path from duplicates
self._duplicate_filenames[filename] = [p for p in self._duplicate_filenames[filename] if p != file_path]
# Update or remove filename mapping based on remaining duplicates
if len(self._duplicate_filenames[filename]) > 0:
# Get the hash for the first remaining duplicate path
first_dup_path = self._duplicate_filenames[filename][0]
first_dup_hash = None
for h, p in self._hash_to_path.items():
if p == first_dup_path:
first_dup_hash = h
break
# Update the filename to hash mapping if we found a hash
if first_dup_hash:
self._filename_to_hash[filename] = first_dup_hash
# If only one duplicate left, remove from duplicates tracking
if len(self._duplicate_filenames[filename]) == 1:
del self._duplicate_filenames[filename]
else:
# No duplicates left, remove filename entry completely
del self._duplicate_filenames[filename]
if filename in self._filename_to_hash:
del self._filename_to_hash[filename]
def remove_by_hash(self, sha256: str) -> None:
"""Remove entry by hash"""
sha256 = sha256.lower()
if sha256 in self._hash_to_path:
path = self._hash_to_path[sha256]
filename = self._get_filename_from_path(path)
if filename in self._filename_to_hash:
del self._filename_to_hash[filename]
del self._hash_to_path[sha256]
if sha256 not in self._hash_to_path:
return
# Get the path and filename
path = self._hash_to_path[sha256]
filename = self._get_filename_from_path(path)
# Get all paths for this hash (including duplicates)
paths_to_remove = [path]
if sha256 in self._duplicate_hashes:
paths_to_remove.extend(self._duplicate_hashes[sha256])
del self._duplicate_hashes[sha256]
# Remove hash-to-path mapping
del self._hash_to_path[sha256]
# Update filename-to-hash and duplicate filenames for all paths
for path_to_remove in paths_to_remove:
fname = self._get_filename_from_path(path_to_remove)
# Clean up from duplicates tracking
if sha256 in self._duplicate_hashes:
del self._duplicate_hashes[sha256]
# If this filename maps to the hash we're removing, remove it
if fname in self._filename_to_hash and self._filename_to_hash[fname] == sha256:
del self._filename_to_hash[fname]
# Update duplicate filenames tracking
if fname in self._duplicate_filenames:
self._duplicate_filenames[fname] = [p for p in self._duplicate_filenames[fname] if p != path_to_remove]
if not self._duplicate_filenames[fname]:
del self._duplicate_filenames[fname]
elif len(self._duplicate_filenames[fname]) == 1:
# If only one entry remains, it's no longer a duplicate
del self._duplicate_filenames[fname]
def has_hash(self, sha256: str) -> bool:
"""Check if hash exists in index"""

View File

@@ -133,6 +133,7 @@ class ModelScanner:
os.rename(temp_path, cache_path)
logger.info(f"Saved {self.model_type} cache with {len(self._cache.raw_data)} models to {cache_path}")
logger.info(f"Hash index stats - hash_to_path: {len(self._hash_index._hash_to_path)}, filename_to_hash: {len(self._hash_index._filename_to_hash)}, duplicate_hashes: {len(self._hash_index._duplicate_hashes)}, duplicate_filenames: {len(self._hash_index._duplicate_filenames)}")
return True
except Exception as e:
logger.error(f"Error saving {self.model_type} cache to disk: {e}")
@@ -1219,3 +1220,166 @@ class ModelScanner:
# Save updated cache to disk
await self._save_cache_to_disk()
return updated
async def bulk_delete_models(self, file_paths: List[str]) -> Dict:
"""Delete multiple models and update cache in a batch operation
Args:
file_paths: List of file paths to delete
Returns:
Dict containing results of the operation
"""
try:
if not file_paths:
return {
'success': False,
'error': 'No file paths provided for deletion',
'results': []
}
# Get the file monitor
file_monitor = getattr(self, 'file_monitor', None)
# Keep track of success and failures
results = []
total_deleted = 0
cache_updated = False
# Get cache data
cache = await self.get_cached_data()
# Track deleted models to update cache once
deleted_models = []
for file_path in file_paths:
try:
target_dir = os.path.dirname(file_path)
file_name = os.path.splitext(os.path.basename(file_path))[0]
# Delete all associated files for the model
from ..utils.routes_common import ModelRouteUtils
deleted_files = await ModelRouteUtils.delete_model_files(
target_dir,
file_name,
file_monitor
)
if deleted_files:
deleted_models.append(file_path)
results.append({
'file_path': file_path,
'success': True,
'deleted_files': deleted_files
})
total_deleted += 1
else:
results.append({
'file_path': file_path,
'success': False,
'error': 'No files deleted'
})
except Exception as e:
logger.error(f"Error deleting file {file_path}: {e}")
results.append({
'file_path': file_path,
'success': False,
'error': str(e)
})
# Batch update cache if any models were deleted
if deleted_models:
# Update the cache in a batch operation
cache_updated = await self._batch_update_cache_for_deleted_models(deleted_models)
return {
'success': True,
'total_deleted': total_deleted,
'total_attempted': len(file_paths),
'cache_updated': cache_updated,
'results': results
}
except Exception as e:
logger.error(f"Error in bulk delete: {e}", exc_info=True)
return {
'success': False,
'error': str(e),
'results': []
}
async def _batch_update_cache_for_deleted_models(self, file_paths: List[str]) -> bool:
"""Update cache after multiple models have been deleted
Args:
file_paths: List of file paths that were deleted
Returns:
bool: True if cache was updated and saved successfully
"""
if not file_paths or self._cache is None:
return False
try:
# Get all models that need to be removed from cache
models_to_remove = [item for item in self._cache.raw_data if item['file_path'] in file_paths]
if not models_to_remove:
return False
# Update tag counts
for model in models_to_remove:
for tag in model.get('tags', []):
if tag in self._tags_count:
self._tags_count[tag] = max(0, self._tags_count[tag] - 1)
if self._tags_count[tag] == 0:
del self._tags_count[tag]
# Update hash index
for model in models_to_remove:
file_path = model['file_path']
if hasattr(self, '_hash_index') and self._hash_index:
# Get the hash and filename before removal for duplicate checking
file_name = os.path.splitext(os.path.basename(file_path))[0]
hash_val = model.get('sha256', '').lower()
# Remove from hash index
self._hash_index.remove_by_path(file_path)
# Check and clean up duplicates
self._cleanup_duplicates_after_removal(hash_val, file_name)
# Update cache data
self._cache.raw_data = [item for item in self._cache.raw_data if item['file_path'] not in file_paths]
# Resort cache
await self._cache.resort()
# Save updated cache to disk
await self._save_cache_to_disk()
return True
except Exception as e:
logger.error(f"Error updating cache after bulk delete: {e}", exc_info=True)
return False
def _cleanup_duplicates_after_removal(self, hash_val: str, file_name: str) -> None:
"""Clean up duplicate entries in hash index after removing a model
Args:
hash_val: SHA256 hash of the removed model
file_name: File name of the removed model without extension
"""
if not hash_val or not file_name or not hasattr(self, '_hash_index'):
return
# Clean up hash duplicates if only 0 or 1 entries remain
if hash_val in self._hash_index._duplicate_hashes:
if len(self._hash_index._duplicate_hashes[hash_val]) <= 1:
del self._hash_index._duplicate_hashes[hash_val]
# Clean up filename duplicates if only 0 or 1 entries remain
if file_name in self._hash_index._duplicate_filenames:
if len(self._hash_index._duplicate_filenames[file_name]) <= 1:
del self._hash_index._duplicate_filenames[file_name]