Enhance Checkpoints Manager: Implement API integration for checkpoints, add filtering and sorting options, and improve UI components for better user experience

This commit is contained in:
Will Miao
2025-04-10 16:04:08 +08:00
parent 048d486fa6
commit 252e90a633
10 changed files with 877 additions and 41 deletions

View File

@@ -10,6 +10,7 @@ from datetime import datetime
from ..services.checkpoint_scanner import CheckpointScanner
from ..config import config
from ..services.settings_manager import settings
from ..utils.utils import fuzzy_match
logger = logging.getLogger(__name__)
@@ -25,9 +26,12 @@ class CheckpointsRoutes:
def setup_routes(self, app):
"""Register routes with the aiohttp app"""
app.router.add_get('/lora_manager/api/checkpoints', self.get_checkpoints)
app.router.add_get('/lora_manager/api/checkpoints/scan', self.scan_checkpoints)
app.router.add_get('/lora_manager/api/checkpoints/info/{name}', self.get_checkpoint_info)
app.router.add_get('/checkpoints', self.handle_checkpoints_page)
app.router.add_get('/api/checkpoints', self.get_checkpoints)
app.router.add_get('/api/checkpoints/base-models', self.get_base_models)
app.router.add_get('/api/checkpoints/top-tags', self.get_top_tags)
app.router.add_get('/api/checkpoints/scan', self.scan_checkpoints)
app.router.add_get('/api/checkpoints/info/{name}', self.get_checkpoint_info)
async def get_checkpoints(self, request):
"""Get paginated checkpoint data"""
@@ -76,8 +80,17 @@ class CheckpointsRoutes:
hash_filters=hash_filters
)
# Format response items
formatted_result = {
'items': [self._format_checkpoint_response(cp) for cp in result['items']],
'total': result['total'],
'page': result['page'],
'page_size': result['page_size'],
'total_pages': result['total_pages']
}
# Return as JSON
return web.json_response(result)
return web.json_response(formatted_result)
except Exception as e:
logger.error(f"Error in get_checkpoints: {e}", exc_info=True)
@@ -90,28 +103,122 @@ class CheckpointsRoutes:
"""Get paginated and filtered checkpoint data"""
cache = await self.scanner.get_cached_data()
# Implement similar filtering logic as in LoraScanner
# (Adapt code from LoraScanner.get_paginated_data)
# ...
# For now, a simplified implementation:
# Get default search options if not provided
if search_options is None:
search_options = {
'filename': True,
'modelname': True,
'tags': False,
'recursive': False,
}
# Get the base data set
filtered_data = cache.sorted_by_date if sort_by == 'date' else cache.sorted_by_name
# Apply basic folder filtering if needed
# Apply hash filtering if provided (highest priority)
if hash_filters:
single_hash = hash_filters.get('single_hash')
multiple_hashes = hash_filters.get('multiple_hashes')
if single_hash:
# Filter by single hash
single_hash = single_hash.lower() # Ensure lowercase for matching
filtered_data = [
cp for cp in filtered_data
if cp.get('sha256', '').lower() == single_hash
]
elif multiple_hashes:
# Filter by multiple hashes
hash_set = set(hash.lower() for hash in multiple_hashes) # Convert to set for faster lookup
filtered_data = [
cp for cp in filtered_data
if cp.get('sha256', '').lower() in hash_set
]
# Jump to pagination
total_items = len(filtered_data)
start_idx = (page - 1) * page_size
end_idx = min(start_idx + page_size, total_items)
result = {
'items': filtered_data[start_idx:end_idx],
'total': total_items,
'page': page,
'page_size': page_size,
'total_pages': (total_items + page_size - 1) // page_size
}
return result
# Apply SFW filtering if enabled in settings
if settings.get('show_only_sfw', False):
filtered_data = [
cp for cp in filtered_data
if not cp.get('preview_nsfw_level') or cp.get('preview_nsfw_level') < NSFW_LEVELS['R']
]
# Apply folder filtering
if folder is not None:
if search_options.get('recursive', False):
# Recursive folder filtering - include all subfolders
filtered_data = [
cp for cp in filtered_data
if cp['folder'].startswith(folder)
]
else:
# Exact folder filtering
filtered_data = [
cp for cp in filtered_data
if cp['folder'] == folder
]
# Apply base model filtering
if base_models and len(base_models) > 0:
filtered_data = [
cp for cp in filtered_data
if cp['folder'] == folder
if cp.get('base_model') in base_models
]
# Apply basic search if needed
# Apply tag filtering
if tags and len(tags) > 0:
filtered_data = [
cp for cp in filtered_data
if any(tag in cp.get('tags', []) for tag in tags)
]
# Apply search filtering
if search:
filtered_data = [
cp for cp in filtered_data
if search.lower() in cp['file_name'].lower() or
search.lower() in cp['model_name'].lower()
]
search_results = []
for cp in filtered_data:
# Search by file name
if search_options.get('filename', True):
if fuzzy_search:
if fuzzy_match(cp.get('file_name', ''), search):
search_results.append(cp)
continue
elif search.lower() in cp.get('file_name', '').lower():
search_results.append(cp)
continue
# Search by model name
if search_options.get('modelname', True):
if fuzzy_search:
if fuzzy_match(cp.get('model_name', ''), search):
search_results.append(cp)
continue
elif search.lower() in cp.get('model_name', '').lower():
search_results.append(cp)
continue
# Search by tags
if search_options.get('tags', False) and 'tags' in cp:
if any((fuzzy_match(tag, search) if fuzzy_search else search.lower() in tag.lower()) for tag in cp['tags']):
search_results.append(cp)
continue
filtered_data = search_results
# Calculate pagination
total_items = len(filtered_data)
start_idx = (page - 1) * page_size
@@ -127,6 +234,88 @@ class CheckpointsRoutes:
return result
def _format_checkpoint_response(self, checkpoint):
"""Format checkpoint data for API response"""
return {
"model_name": checkpoint["model_name"],
"file_name": checkpoint["file_name"],
"preview_url": config.get_preview_static_url(checkpoint.get("preview_url", "")),
"preview_nsfw_level": checkpoint.get("preview_nsfw_level", 0),
"base_model": checkpoint.get("base_model", ""),
"folder": checkpoint["folder"],
"sha256": checkpoint.get("sha256", ""),
"file_path": checkpoint["file_path"].replace(os.sep, "/"),
"file_size": checkpoint.get("size", 0),
"modified": checkpoint.get("modified", ""),
"tags": checkpoint.get("tags", []),
"modelDescription": checkpoint.get("modelDescription", ""),
"from_civitai": checkpoint.get("from_civitai", True),
"notes": checkpoint.get("notes", ""),
"model_type": checkpoint.get("model_type", "checkpoint"),
"civitai": self._filter_civitai_data(checkpoint.get("civitai", {}))
}
def _filter_civitai_data(self, data):
"""Filter relevant fields from CivitAI data"""
if not data:
return {}
fields = [
"id", "modelId", "name", "createdAt", "updatedAt",
"publishedAt", "trainedWords", "baseModel", "description",
"model", "images"
]
return {k: data[k] for k in fields if k in data}
async def get_top_tags(self, request: web.Request) -> web.Response:
"""Handle request for top tags sorted by frequency"""
try:
# Parse query parameters
limit = int(request.query.get('limit', '20'))
# Validate limit
if limit < 1 or limit > 100:
limit = 20 # Default to a reasonable limit
# Get top tags
top_tags = await self.scanner.get_top_tags(limit)
return web.json_response({
'success': True,
'tags': top_tags
})
except Exception as e:
logger.error(f"Error getting top tags: {str(e)}", exc_info=True)
return web.json_response({
'success': False,
'error': 'Internal server error'
}, status=500)
async def get_base_models(self, request: web.Request) -> web.Response:
"""Get base models used in loras"""
try:
# Parse query parameters
limit = int(request.query.get('limit', '20'))
# Validate limit
if limit < 1 or limit > 100:
limit = 20 # Default to a reasonable limit
# Get base models
base_models = await self.scanner.get_base_models(limit)
return web.json_response({
'success': True,
'base_models': base_models
})
except Exception as e:
logger.error(f"Error retrieving base models: {e}")
return web.json_response({
'success': False,
'error': str(e)
}, status=500)
async def scan_checkpoints(self, request):
"""Force a rescan of checkpoint files"""
try: