diff --git a/py/utils/routes_common.py b/py/utils/routes_common.py deleted file mode 100644 index 642bfcad..00000000 --- a/py/utils/routes_common.py +++ /dev/null @@ -1,868 +0,0 @@ -import os -import json -import logging -from typing import Dict, Callable, Awaitable -from aiohttp import web -from datetime import datetime - -from .model_utils import determine_base_model -from .constants import PREVIEW_EXTENSIONS, CARD_PREVIEW_WIDTH -from ..config import config -from ..services.service_registry import ServiceRegistry -from ..services.downloader import get_downloader -from ..utils.exif_utils import ExifUtils -from ..utils.metadata_manager import MetadataManager -from ..services.websocket_manager import ws_manager -from ..services.metadata_service import get_default_metadata_provider, get_metadata_provider -from ..services.settings_manager import settings - -logger = logging.getLogger(__name__) - -# TODO: retire this class -class ModelRouteUtils: - """Shared utilities for model routes (LoRAs, Checkpoints, etc.)""" - - @staticmethod - async def load_local_metadata(metadata_path: str) -> Dict: - """Load local metadata file""" - if os.path.exists(metadata_path): - try: - with open(metadata_path, 'r', encoding='utf-8') as f: - return json.load(f) - except Exception as e: - logger.error(f"Error loading metadata from {metadata_path}: {e}") - return {} - - @staticmethod - async def handle_not_found_on_civitai(metadata_path: str, local_metadata: Dict) -> None: - """Handle case when model is not found on CivitAI""" - local_metadata['from_civitai'] = False - await MetadataManager.save_metadata(metadata_path, local_metadata) - - @staticmethod - def is_civitai_api_metadata(meta: dict) -> bool: - """ - Determine if the given civitai metadata is from the civitai API. - Returns True if both 'files' and 'images' exist and are non-empty, - and the 'source' is not 'archive_db'. - """ - if not isinstance(meta, dict): - return False - files = meta.get('files') - images = meta.get('images') - source = meta.get('source') - return bool(files) and bool(images) and source != 'archive_db' - - @staticmethod - async def update_model_metadata(metadata_path: str, local_metadata: Dict, - civitai_metadata: Dict, metadata_provider=None) -> None: - """Update local metadata with CivitAI data""" - # Save existing trainedWords and customImages if they exist - existing_civitai = local_metadata.get('civitai') or {} # Use empty dict if None - - # Check if we should skip the update to avoid overwriting richer data - if civitai_metadata.get('source') == 'archive_db' and ModelRouteUtils.is_civitai_api_metadata(existing_civitai): - logger.info(f"Skip civitai update for {local_metadata.get('model_name', '')} ({existing_civitai.get('name', '')})") - else: - # Create a new civitai metadata by updating existing with new - merged_civitai = existing_civitai.copy() - merged_civitai.update(civitai_metadata) - - if civitai_metadata.get('source') == 'archive_db': - model_name = civitai_metadata.get('model', {}).get('name', '') - version_name = civitai_metadata.get('name', '') - logger.info(f"Recovered metadata from archive_db for deleted model: {model_name} ({version_name})") - - # Special handling for trainedWords - ensure we don't lose any existing trained words - if 'trainedWords' in existing_civitai: - existing_trained_words = existing_civitai.get('trainedWords', []) - new_trained_words = civitai_metadata.get('trainedWords', []) - # Use a set to combine words without duplicates, then convert back to list - merged_trained_words = list(set(existing_trained_words + new_trained_words)) - merged_civitai['trainedWords'] = merged_trained_words - - # Update local metadata with merged civitai data - local_metadata['civitai'] = merged_civitai - - # Update model-related metadata from civitai_metadata.model - if 'model' in civitai_metadata and civitai_metadata['model']: - model_data = civitai_metadata['model'] - - # Update model name if available and not already set - if model_data.get('name'): - local_metadata['model_name'] = model_data['name'] - - # Update modelDescription if missing or empty in local_metadata - if not local_metadata.get('modelDescription') and model_data.get('description'): - local_metadata['modelDescription'] = model_data['description'] - - # Update tags if missing or empty in local_metadata - if not local_metadata.get('tags') and model_data.get('tags'): - local_metadata['tags'] = model_data['tags'] - - # Update creator in civitai metadata if missing - if model_data.get('creator') and not local_metadata.get('civitai', {}).get('creator'): - if 'civitai' not in local_metadata: - local_metadata['civitai'] = {} - local_metadata['civitai']['creator'] = model_data['creator'] - - # Update base model - local_metadata['base_model'] = determine_base_model(civitai_metadata.get('baseModel')) - - # Update preview if needed - if not local_metadata.get('preview_url') or not os.path.exists(local_metadata['preview_url']): - first_preview = next((img for img in civitai_metadata.get('images', [])), None) - if (first_preview): - # Determine if content is video or image - is_video = first_preview['type'] == 'video' - - if is_video: - # For videos use .mp4 extension - preview_ext = '.mp4' - else: - # For images use .webp extension - preview_ext = '.webp' - - base_name = os.path.splitext(os.path.splitext(os.path.basename(metadata_path))[0])[0] - preview_filename = base_name + preview_ext - preview_path = os.path.join(os.path.dirname(metadata_path), preview_filename) - - if is_video: - # Download video as is using downloader - downloader = await get_downloader() - success, result = await downloader.download_file( - first_preview['url'], - preview_path, - use_auth=False - ) - if success: - local_metadata['preview_url'] = preview_path.replace(os.sep, '/') - local_metadata['preview_nsfw_level'] = first_preview.get('nsfwLevel', 0) - else: - # For images, download and then optimize to WebP using downloader - downloader = await get_downloader() - success, content, headers = await downloader.download_to_memory( - first_preview['url'], - use_auth=False - ) - if success: - try: - # Optimize and convert to WebP - optimized_data, _ = ExifUtils.optimize_image( - image_data=content, # Use downloaded content directly - target_width=CARD_PREVIEW_WIDTH, - format='webp', - quality=85, - preserve_metadata=False - ) - - # Save the optimized WebP image - with open(preview_path, 'wb') as f: - f.write(optimized_data) - - # Update metadata - local_metadata['preview_url'] = preview_path.replace(os.sep, '/') - local_metadata['preview_nsfw_level'] = first_preview.get('nsfwLevel', 0) - - except Exception as e: - logger.error(f"Error optimizing preview image: {e}") - # If optimization fails, save the original content - try: - with open(preview_path, 'wb') as f: - f.write(content) - local_metadata['preview_url'] = preview_path.replace(os.sep, '/') - local_metadata['preview_nsfw_level'] = first_preview.get('nsfwLevel', 0) - except Exception as save_error: - logger.error(f"Error saving preview image: {save_error}") - - # Save updated metadata - await MetadataManager.save_metadata(metadata_path, local_metadata) - - @staticmethod - async def fetch_and_update_model( - sha256: str, - file_path: str, - model_data: dict, - update_cache_func: Callable[[str, str, Dict], Awaitable[bool]] - ) -> tuple[bool, str]: - """Fetch and update metadata for a single model - - Args: - sha256: SHA256 hash of the model file - file_path: Path to the model file - model_data: The model object in cache to update - update_cache_func: Function to update the cache with new metadata - - Returns: - tuple[bool, str]: (success, error_message). When success is True, error_message is None. - """ - try: - # Validate input parameters - if not isinstance(model_data, dict): - error_msg = f"Invalid model_data type: {type(model_data)}" - logger.error(error_msg) - return False, error_msg - - metadata_path = os.path.splitext(file_path)[0] + '.metadata.json' - enable_metadata_archive_db = settings.get('enable_metadata_archive_db', False) - - if model_data.get('civitai_deleted') is True: - # If CivitAI deleted flag is set, skip CivitAI API provider - if not enable_metadata_archive_db or model_data.get('db_checked') is True: - return False, "CivitAI model is deleted and metadata archive DB is not enabled" - # Likely deleted from CivitAI, use archive_db if available - metadata_provider = await get_metadata_provider('sqlite') - else: - metadata_provider = await get_default_metadata_provider() - - civitai_metadata, error = await metadata_provider.get_model_by_hash(sha256) - if not civitai_metadata: - if error == "Model not found": - model_data['from_civitai'] = False - model_data['civitai_deleted'] = True - model_data['db_checked'] = enable_metadata_archive_db - model_data['last_checked_at'] = datetime.now().timestamp() - - # Remove 'folder' key from model_data if present before saving - data_to_save = model_data.copy() - data_to_save.pop('folder', None) - await MetadataManager.save_metadata(file_path, data_to_save) - - # For other errors, log and return False with error message - error_msg = f"Error fetching metadata: {error} (model_name={model_data.get('model_name', '')})" - logger.error(error_msg) - return False, error_msg - - model_data['from_civitai'] = True - model_data['civitai_deleted'] = civitai_metadata.get('source') == 'archive_db' - model_data['db_checked'] = enable_metadata_archive_db - model_data['last_checked_at'] = datetime.now().timestamp() - - local_metadata = model_data.copy() - local_metadata.pop('folder', None) # Remove 'folder' key if present - - # Update metadata - await ModelRouteUtils.update_model_metadata( - metadata_path, - local_metadata, - civitai_metadata, - metadata_provider - ) - - # Update cache object directly using safe .get() method - update_dict = { - 'model_name': local_metadata.get('model_name'), - 'preview_url': local_metadata.get('preview_url'), - 'civitai': local_metadata.get('civitai'), - } - model_data.update(update_dict) - - # Update cache using the provided function - await update_cache_func(file_path, file_path, local_metadata) - - return True, None - - except KeyError as e: - error_msg = f"Error fetching metadata - Missing key: {e} in model_data={model_data}" - logger.error(error_msg) - return False, error_msg - except Exception as e: - error_msg = f"Error fetching metadata: {str(e)}" - logger.error(error_msg, exc_info=True) # Include stack trace - return False, error_msg - - @staticmethod - def filter_civitai_data(data: Dict, minimal: bool = False) -> Dict: - """Filter relevant fields from CivitAI data""" - if not data: - return {} - - fields = ["id", "modelId", "name", "trainedWords"] if minimal else [ - "id", "modelId", "name", "createdAt", "updatedAt", - "publishedAt", "trainedWords", "baseModel", "description", - "model", "images", "customImages", "creator" - ] - return {k: data[k] for k in fields if k in data} - - @staticmethod - async def handle_fetch_civitai(request: web.Request, scanner) -> web.Response: - """Handle CivitAI metadata fetch request - - Args: - request: The aiohttp request - scanner: The model scanner instance with cache management methods - - Returns: - web.Response: The HTTP response with metadata on success - """ - try: - data = await request.json() - metadata_path = os.path.splitext(data['file_path'])[0] + '.metadata.json' - - # Check if model metadata exists - local_metadata = await ModelRouteUtils.load_local_metadata(metadata_path) - if not local_metadata or not local_metadata.get('sha256'): - return web.json_response({"success": False, "error": "No SHA256 hash found"}, status=400) - - # Get metadata provider and fetch from unified provider - metadata_provider = await get_default_metadata_provider() - - # Fetch and update metadata - civitai_metadata, error = await metadata_provider.get_model_by_hash(local_metadata["sha256"]) - if not civitai_metadata: - await ModelRouteUtils.handle_not_found_on_civitai(metadata_path, local_metadata) - return web.json_response({"success": False, "error": error}, status=404) - - await ModelRouteUtils.update_model_metadata(metadata_path, local_metadata, civitai_metadata, metadata_provider) - - # Update the cache - await scanner.update_single_model_cache(data['file_path'], data['file_path'], local_metadata) - - # Return the updated metadata along with success status - return web.json_response({"success": True, "metadata": local_metadata}) - - except Exception as e: - logger.error(f"Error fetching from CivitAI: {e}", exc_info=True) - return web.json_response({"success": False, "error": str(e)}, status=500) - - @staticmethod - async def handle_replace_preview(request: web.Request, scanner) -> web.Response: - """Handle preview image replacement request""" - try: - reader = await request.multipart() - - # Read preview file data - field = await reader.next() - if field.name != 'preview_file': - raise ValueError("Expected 'preview_file' field") - content_type = field.headers.get('Content-Type', 'image/png') - - # Try to get original filename if available - content_disposition = field.headers.get('Content-Disposition', '') - original_filename = None - import re - filename_match = re.search(r'filename="(.*?)"', content_disposition) - if filename_match: - original_filename = filename_match.group(1) - - preview_data = await field.read() - - # Read model path - field = await reader.next() - if field.name != 'model_path': - raise ValueError("Expected 'model_path' field") - model_path = (await field.read()).decode() - - # Read NSFW level - nsfw_level = 0 # Default to 0 (unknown) - field = await reader.next() - if field and field.name == 'nsfw_level': - try: - nsfw_level = int((await field.read()).decode()) - except (ValueError, TypeError): - logger.warning("Invalid NSFW level format, using default 0") - - # Save preview file - base_name = os.path.splitext(os.path.basename(model_path))[0] - folder = os.path.dirname(model_path) - - # Determine format based on content type and original filename - is_gif = False - if original_filename and original_filename.lower().endswith('.gif'): - is_gif = True - elif content_type.lower() == 'image/gif': - is_gif = True - - # Determine if content is video or image and handle specific formats - if content_type.startswith('video/'): - # For videos, preserve original format if possible - if original_filename: - extension = os.path.splitext(original_filename)[1].lower() - # Default to .mp4 if no extension or unrecognized - if not extension or extension not in ['.mp4', '.webm', '.mov', '.avi']: - extension = '.mp4' - else: - # Try to determine extension from content type - if 'webm' in content_type: - extension = '.webm' - else: - extension = '.mp4' # Default - optimized_data = preview_data # No optimization for videos - elif is_gif: - # Preserve GIF format without optimization - extension = '.gif' - optimized_data = preview_data - else: - # For other images, optimize and convert to WebP - optimized_data, _ = ExifUtils.optimize_image( - image_data=preview_data, - target_width=CARD_PREVIEW_WIDTH, - format='webp', - quality=85, - preserve_metadata=False - ) - extension = '.webp' - - # Delete any existing preview files for this model - for ext in PREVIEW_EXTENSIONS: - existing_preview = os.path.join(folder, base_name + ext) - if os.path.exists(existing_preview): - try: - os.remove(existing_preview) - logger.debug(f"Deleted existing preview: {existing_preview}") - except Exception as e: - logger.warning(f"Failed to delete existing preview {existing_preview}: {e}") - - preview_path = os.path.join(folder, base_name + extension).replace(os.sep, '/') - - with open(preview_path, 'wb') as f: - f.write(optimized_data) - - # Update preview path and NSFW level in metadata - metadata_path = os.path.splitext(model_path)[0] + '.metadata.json' - if os.path.exists(metadata_path): - try: - with open(metadata_path, 'r', encoding='utf-8') as f: - metadata = json.load(f) - - # Update preview_url and preview_nsfw_level in the metadata dict - metadata['preview_url'] = preview_path - metadata['preview_nsfw_level'] = nsfw_level - - await MetadataManager.save_metadata(model_path, metadata) - except Exception as e: - logger.error(f"Error updating metadata: {e}") - - # Update preview URL in scanner cache - await scanner.update_preview_in_cache(model_path, preview_path, nsfw_level) - - return web.json_response({ - "success": True, - "preview_url": config.get_preview_static_url(preview_path), - "preview_nsfw_level": nsfw_level - }) - - except Exception as e: - logger.error(f"Error replacing preview: {e}", exc_info=True) - return web.Response(text=str(e), status=500) - - @staticmethod - async def handle_download_model(request: web.Request) -> web.Response: - """Handle model download request""" - try: - download_manager = await ServiceRegistry.get_download_manager() - data = await request.json() - - # Get or generate a download ID - download_id = data.get('download_id', ws_manager.generate_download_id()) - - # Create progress callback with download ID - async def progress_callback(progress): - await ws_manager.broadcast_download_progress(download_id, { - 'status': 'progress', - 'progress': progress, - 'download_id': download_id - }) - - # Check which identifier is provided and convert to int - model_id = None - model_version_id = None - - if data.get('model_id'): - try: - model_id = int(data.get('model_id')) - except (TypeError, ValueError): - return web.json_response({ - 'success': False, - 'error': "Invalid model_id: Must be an integer" - }, status=400) - - # Convert model_version_id to int if provided - if data.get('model_version_id'): - try: - model_version_id = int(data.get('model_version_id')) - except (TypeError, ValueError): - return web.json_response({ - 'success': False, - 'error': "Invalid model_version_id: Must be an integer" - }, status=400) - - # At least one identifier is required - if not model_id and not model_version_id: - return web.json_response({ - 'success': False, - 'error': "Missing required parameter: Please provide either 'model_id' or 'model_version_id'" - }, status=400) - - use_default_paths = data.get('use_default_paths', False) - source = data.get('source') # Optional source parameter - - # Pass the download_id to download_from_civitai - result = await download_manager.download_from_civitai( - model_id=model_id, - model_version_id=model_version_id, - save_dir=data.get('model_root'), - relative_path=data.get('relative_path', ''), - use_default_paths=use_default_paths, - progress_callback=progress_callback, - download_id=download_id, # Pass download_id explicitly - source=source # Pass source parameter - ) - - # Include download_id in the response - result['download_id'] = download_id - - if not result.get('success', False): - error_message = result.get('error', 'Unknown error') - - return web.json_response({ - 'success': False, - 'error': error_message, - 'download_id': download_id - }, status=500) - - return web.json_response(result) - - except Exception as e: - error_message = str(e) - - # Check if this might be an early access error - if '401' in error_message: - logger.warning(f"Early access error (401): {error_message}") - return web.json_response({ - 'success': False, - 'error': "Early Access Restriction: This model requires purchase. Please buy early access on Civitai.com." - }, status=401) - - logger.error(f"Error downloading model: {error_message}") - return web.json_response({ - 'success': False, - 'error': error_message - }, status=500) - - @staticmethod - async def handle_cancel_download(request: web.Request) -> web.Response: - """Handle cancellation of a download task - - Args: - request: The aiohttp request - - Returns: - web.Response: The HTTP response - """ - try: - download_manager = await ServiceRegistry.get_download_manager() - download_id = request.match_info.get('download_id') - if not download_id: - return web.json_response({ - 'success': False, - 'error': 'Download ID is required' - }, status=400) - - result = await download_manager.cancel_download(download_id) - - # Notify clients about cancellation via WebSocket - await ws_manager.broadcast_download_progress(download_id, { - 'status': 'cancelled', - 'progress': 0, - 'download_id': download_id, - 'message': 'Download cancelled by user' - }) - - return web.json_response(result) - - except Exception as e: - logger.error(f"Error cancelling download: {e}", exc_info=True) - return web.json_response({ - 'success': False, - 'error': str(e) - }, status=500) - - @staticmethod - async def handle_list_downloads(request: web.Request) -> web.Response: - """Get list of active downloads - - Args: - request: The aiohttp request - - Returns: - web.Response: The HTTP response with list of downloads - """ - try: - download_manager = await ServiceRegistry.get_download_manager() - result = await download_manager.get_active_downloads() - return web.json_response(result) - except Exception as e: - logger.error(f"Error listing downloads: {e}", exc_info=True) - return web.json_response({ - 'success': False, - 'error': str(e) - }, status=500) - - @staticmethod - async def handle_relink_civitai(request: web.Request, scanner) -> web.Response: - """Handle CivitAI metadata re-linking request by model ID and/or version ID - - Args: - request: The aiohttp request - scanner: The model scanner instance with cache management methods - - Returns: - web.Response: The HTTP response - """ - try: - data = await request.json() - file_path = data.get('file_path') - model_id = int(data.get('model_id')) - model_version_id = None - if data.get('model_version_id'): - model_version_id = int(data.get('model_version_id')) - - if not file_path or not model_id: - return web.json_response({"success": False, "error": "Both file_path and model_id are required"}, status=400) - - metadata_path = os.path.splitext(file_path)[0] + '.metadata.json' - - # Check if model metadata exists - local_metadata = await ModelRouteUtils.load_local_metadata(metadata_path) - - # Get metadata provider and fetch metadata using get_model_version which includes more comprehensive data - metadata_provider = await get_default_metadata_provider() - civitai_metadata = await metadata_provider.get_model_version(model_id, model_version_id) - if not civitai_metadata: - error_msg = f"Model version not found on CivitAI for ID: {model_id}" - if model_version_id: - error_msg += f" with version: {model_version_id}" - return web.json_response({"success": False, "error": error_msg}, status=404) - - # Try to find the primary model file to get the SHA256 hash - primary_model_file = None - for file in civitai_metadata.get('files', []): - if file.get('primary', False) and file.get('type') == 'Model': - primary_model_file = file - break - - # Update the SHA256 hash in local metadata if available - if primary_model_file and primary_model_file.get('hashes', {}).get('SHA256'): - local_metadata['sha256'] = primary_model_file['hashes']['SHA256'].lower() - - # Update metadata with CivitAI information - await ModelRouteUtils.update_model_metadata(metadata_path, local_metadata, civitai_metadata, metadata_provider) - - # Update the cache - await scanner.update_single_model_cache(file_path, file_path, local_metadata) - - return web.json_response({ - "success": True, - "message": f"Model successfully re-linked to Civitai model {model_id}" + - (f" version {model_version_id}" if model_version_id else ""), - "hash": local_metadata.get('sha256', '') - }) - - except Exception as e: - logger.error(f"Error re-linking to CivitAI: {e}", exc_info=True) - return web.json_response({"success": False, "error": str(e)}, status=500) - - @staticmethod - async def handle_verify_duplicates(request: web.Request, scanner) -> web.Response: - """Handle verification of duplicate model hashes - - Args: - request: The aiohttp request - scanner: The model scanner instance with cache management methods - - Returns: - web.Response: The HTTP response with verification results - """ - try: - data = await request.json() - file_paths = data.get('file_paths', []) - - if not file_paths: - return web.json_response({ - 'success': False, - 'error': 'No file paths provided for verification' - }, status=400) - - # Results tracking - results = { - 'verified_as_duplicates': True, # Start true, set to false if any mismatch - 'mismatched_files': [], - 'new_hash_map': {} - } - - # Get expected hash from the first file's metadata - expected_hash = None - first_metadata_path = os.path.splitext(file_paths[0])[0] + '.metadata.json' - first_metadata = await ModelRouteUtils.load_local_metadata(first_metadata_path) - if first_metadata and 'sha256' in first_metadata: - expected_hash = first_metadata['sha256'].lower() - - # Process each file - for file_path in file_paths: - # Skip files that don't exist - if not os.path.exists(file_path): - continue - - # Calculate actual hash - try: - from .file_utils import calculate_sha256 - actual_hash = await calculate_sha256(file_path) - - # Get metadata - metadata_path = os.path.splitext(file_path)[0] + '.metadata.json' - metadata = await ModelRouteUtils.load_local_metadata(metadata_path) - - # Compare hashes - stored_hash = metadata.get('sha256', '').lower(); - - # Set expected hash from first file if not yet set - if not expected_hash: - expected_hash = stored_hash; - - # Check if hash matches expected hash - if actual_hash != expected_hash: - results['verified_as_duplicates'] = False - results['mismatched_files'].append(file_path) - results['new_hash_map'][file_path] = actual_hash - - # Check if stored hash needs updating - if actual_hash != stored_hash: - # Update metadata with actual hash - metadata['sha256'] = actual_hash - - # Save updated metadata - await MetadataManager.save_metadata(file_path, metadata) - - # Update cache - await scanner.update_single_model_cache(file_path, file_path, metadata) - except Exception as e: - logger.error(f"Error verifying hash for {file_path}: {e}") - results['mismatched_files'].append(file_path) - results['new_hash_map'][file_path] = "error_calculating_hash" - results['verified_as_duplicates'] = False - - return web.json_response({ - 'success': True, - **results - }) - - except Exception as e: - logger.error(f"Error verifying duplicate models: {e}", exc_info=True) - return web.json_response({ - 'success': False, - 'error': str(e) - }, status=500) - - @staticmethod - async def handle_save_metadata(request: web.Request, scanner) -> web.Response: - """Handle saving metadata updates - - Args: - request: The aiohttp request - scanner: The model scanner instance - - Returns: - web.Response: The HTTP response - """ - try: - data = await request.json() - file_path = data.get('file_path') - if not file_path: - return web.Response(text='File path is required', status=400) - - # Remove file path from data to avoid saving it - metadata_updates = {k: v for k, v in data.items() if k != 'file_path'} - - # Get metadata file path - metadata_path = os.path.splitext(file_path)[0] + '.metadata.json' - - # Load existing metadata - metadata = await ModelRouteUtils.load_local_metadata(metadata_path) - - # Handle nested updates (for civitai.trainedWords) - for key, value in metadata_updates.items(): - if isinstance(value, dict) and key in metadata and isinstance(metadata[key], dict): - # Deep update for nested dictionaries - for nested_key, nested_value in value.items(): - metadata[key][nested_key] = nested_value - else: - # Regular update for top-level keys - metadata[key] = value - - # Save updated metadata - await MetadataManager.save_metadata(file_path, metadata) - - # Update cache - await scanner.update_single_model_cache(file_path, file_path, metadata) - - # If model_name was updated, resort the cache - if 'model_name' in metadata_updates: - cache = await scanner.get_cached_data() - await cache.resort() - - return web.json_response({'success': True}) - - except Exception as e: - logger.error(f"Error saving metadata: {e}", exc_info=True) - return web.Response(text=str(e), status=500) - - @staticmethod - async def handle_add_tags(request: web.Request, scanner) -> web.Response: - """Handle adding tags to model metadata - - Args: - request: The aiohttp request - scanner: The model scanner instance - - Returns: - web.Response: The HTTP response - """ - try: - data = await request.json() - file_path = data.get('file_path') - new_tags = data.get('tags', []) - - if not file_path: - return web.Response(text='File path is required', status=400) - - if not isinstance(new_tags, list): - return web.Response(text='Tags must be a list', status=400) - - # Get metadata file path - metadata_path = os.path.splitext(file_path)[0] + '.metadata.json' - - # Load existing metadata - metadata = await ModelRouteUtils.load_local_metadata(metadata_path) - - # Get existing tags (case insensitive) - existing_tags = metadata.get('tags', []) - existing_tags_lower = [tag.lower() for tag in existing_tags] - - # Add new tags that don't already exist (case insensitive check) - tags_added = [] - for tag in new_tags: - if isinstance(tag, str) and tag.strip(): - tag_stripped = tag.strip() - if tag_stripped.lower() not in existing_tags_lower: - existing_tags.append(tag_stripped) - existing_tags_lower.append(tag_stripped.lower()) - tags_added.append(tag_stripped) - - # Update metadata with combined tags - metadata['tags'] = existing_tags - - # Save updated metadata - await MetadataManager.save_metadata(file_path, metadata) - - # Update cache - await scanner.update_single_model_cache(file_path, file_path, metadata) - - return web.json_response({ - 'success': True, - 'tags': existing_tags - }) - - except Exception as e: - logger.error(f"Error adding tags: {e}", exc_info=True) - return web.Response(text=str(e), status=500)