import os import json import jinja2 from aiohttp import web import logging from datetime import datetime, timedelta from collections import defaultdict, Counter from typing import Dict, List, Any from ..config import config from ..services.settings_manager import get_settings_manager from ..services.server_i18n import server_i18n from ..services.service_registry import ServiceRegistry from ..services.model_query import normalize_sub_type, resolve_sub_type from ..utils.constants import VALID_LORA_SUB_TYPES, VALID_CHECKPOINT_SUB_TYPES from ..utils.usage_stats import UsageStats logger = logging.getLogger(__name__) class _SettingsProxy: def __init__(self): self._manager = None def _resolve(self): if self._manager is None: self._manager = get_settings_manager() return self._manager def get(self, *args, **kwargs): return self._resolve().get(*args, **kwargs) def __getattr__(self, item): return getattr(self._resolve(), item) settings = _SettingsProxy() class StatsRoutes: """Route handlers for Statistics page and API endpoints""" def __init__(self): self.lora_scanner = None self.checkpoint_scanner = None self.embedding_scanner = None self.usage_stats = None self.template_env = jinja2.Environment( loader=jinja2.FileSystemLoader(config.templates_path), autoescape=True ) async def init_services(self): """Initialize services from ServiceRegistry""" self.lora_scanner = await ServiceRegistry.get_lora_scanner() self.checkpoint_scanner = await ServiceRegistry.get_checkpoint_scanner() self.embedding_scanner = await ServiceRegistry.get_embedding_scanner() # Only initialize usage stats if we have valid paths configured try: self.usage_stats = UsageStats() except RuntimeError as e: logger.warning(f"Could not initialize usage statistics: {e}") self.usage_stats = None async def handle_stats_page(self, request: web.Request) -> web.Response: """Handle GET /statistics request""" try: # Ensure services are initialized await self.init_services() # Check if scanners are initializing lora_initializing = ( self.lora_scanner._cache is None or (hasattr(self.lora_scanner, 'is_initializing') and self.lora_scanner.is_initializing()) ) checkpoint_initializing = ( self.checkpoint_scanner._cache is None or (hasattr(self.checkpoint_scanner, '_is_initializing') and self.checkpoint_scanner._is_initializing) ) embedding_initializing = ( self.embedding_scanner._cache is None or (hasattr(self.embedding_scanner, 'is_initializing') and self.embedding_scanner.is_initializing()) ) is_initializing = lora_initializing or checkpoint_initializing or embedding_initializing # 获取用户语言设置 settings_object = settings user_language = settings_object.get('language', 'en') settings_manager = settings_object if not isinstance(settings_object, _SettingsProxy) else settings_object._resolve() # 设置服务端i18n语言 server_i18n.set_locale(user_language) # 为模板环境添加i18n过滤器 if not hasattr(self.template_env, '_i18n_filter_added'): self.template_env.filters['t'] = server_i18n.create_template_filter() self.template_env._i18n_filter_added = True template = self.template_env.get_template('statistics.html') rendered = template.render( is_initializing=is_initializing, settings=settings_manager, request=request, t=server_i18n.get_translation, ) return web.Response( text=rendered, content_type='text/html' ) except Exception as e: logger.error(f"Error handling statistics request: {e}", exc_info=True) return web.Response( text="Error loading statistics page", status=500 ) async def get_collection_overview(self, request: web.Request) -> web.Response: """Get collection overview statistics""" try: await self.init_services() # Get LoRA statistics lora_cache = await self.lora_scanner.get_cached_data() lora_count = len(lora_cache.raw_data) lora_size = sum(lora.get('size', 0) for lora in lora_cache.raw_data) # Get Checkpoint statistics checkpoint_cache = await self.checkpoint_scanner.get_cached_data() checkpoint_count = len(checkpoint_cache.raw_data) checkpoint_size = sum(cp.get('size', 0) for cp in checkpoint_cache.raw_data) # Get Embedding statistics embedding_cache = await self.embedding_scanner.get_cached_data() embedding_count = len(embedding_cache.raw_data) embedding_size = sum(emb.get('size', 0) for emb in embedding_cache.raw_data) # Get usage statistics usage_data = await self.usage_stats.get_stats() # CivitAI model type distribution across all model types # Use the same logic as the filter panel: normalize_sub_type(resolve_sub_type(entry)) # with sub-type validation per model type model_types_counter: Counter[str] = Counter() for entry in lora_cache.raw_data: ntype = normalize_sub_type(resolve_sub_type(entry)) if ntype and ntype in VALID_LORA_SUB_TYPES: model_types_counter[ntype] += 1 for entry in checkpoint_cache.raw_data: ntype = normalize_sub_type(resolve_sub_type(entry)) if ntype and ntype in VALID_CHECKPOINT_SUB_TYPES: model_types_counter[ntype] += 1 # Embeddings: always count as "embedding" regardless of CivitAI sub-type model_types_counter['embedding'] = len(embedding_cache.raw_data) return web.json_response({ 'success': True, 'data': { 'total_models': lora_count + checkpoint_count + embedding_count, 'lora_count': lora_count, 'checkpoint_count': checkpoint_count, 'embedding_count': embedding_count, 'total_size': lora_size + checkpoint_size + embedding_size, 'lora_size': lora_size, 'checkpoint_size': checkpoint_size, 'embedding_size': embedding_size, 'total_generations': usage_data.get('total_executions', 0), 'unused_loras': self._count_unused_models(lora_cache.raw_data, usage_data.get('loras', {})), 'unused_checkpoints': self._count_unused_models(checkpoint_cache.raw_data, usage_data.get('checkpoints', {})), 'unused_embeddings': self._count_unused_models(embedding_cache.raw_data, usage_data.get('embeddings', {})), 'model_types_distribution': dict(model_types_counter.most_common()) } }) except Exception as e: logger.error(f"Error getting collection overview: {e}", exc_info=True) return web.json_response({ 'success': False, 'error': str(e) }, status=500) async def get_usage_analytics(self, request: web.Request) -> web.Response: """Get usage analytics data""" try: await self.init_services() # Get usage statistics usage_data = await self.usage_stats.get_stats() # Get model data for enrichment lora_cache = await self.lora_scanner.get_cached_data() checkpoint_cache = await self.checkpoint_scanner.get_cached_data() embedding_cache = await self.embedding_scanner.get_cached_data() # Create hash to model mapping lora_map = {lora['sha256']: lora for lora in lora_cache.raw_data} checkpoint_map = {cp['sha256']: cp for cp in checkpoint_cache.raw_data} embedding_map = {emb['sha256']: emb for emb in embedding_cache.raw_data} # Prepare top used models top_loras = self._get_top_used_models(usage_data.get('loras', {}), lora_map, 10) top_checkpoints = self._get_top_used_models(usage_data.get('checkpoints', {}), checkpoint_map, 10) top_embeddings = self._get_top_used_models(usage_data.get('embeddings', {}), embedding_map, 10) # Prepare usage timeline (last 30 days) timeline = self._get_usage_timeline(usage_data, 30) return web.json_response({ 'success': True, 'data': { 'top_loras': top_loras, 'top_checkpoints': top_checkpoints, 'top_embeddings': top_embeddings, 'usage_timeline': timeline, 'total_executions': usage_data.get('total_executions', 0) } }) except Exception as e: logger.error(f"Error getting usage analytics: {e}", exc_info=True) return web.json_response({ 'success': False, 'error': str(e) }, status=500) async def get_model_usage_list(self, request: web.Request) -> web.Response: """Get paginated model usage list for infinite scrolling""" try: await self.init_services() model_type = request.query.get('type', 'lora') sort_order = request.query.get('sort', 'desc') try: limit = int(request.query.get('limit', '50')) offset = int(request.query.get('offset', '0')) except ValueError: limit = 50 offset = 0 # Get usage statistics usage_data = await self.usage_stats.get_stats() # Select proper cache and usage dict based on type if model_type == 'lora': cache = await self.lora_scanner.get_cached_data() type_usage_data = usage_data.get('loras', {}) elif model_type == 'checkpoint': cache = await self.checkpoint_scanner.get_cached_data() type_usage_data = usage_data.get('checkpoints', {}) elif model_type == 'embedding': cache = await self.embedding_scanner.get_cached_data() type_usage_data = usage_data.get('embeddings', {}) else: return web.json_response({'success': False, 'error': f"Invalid model type: {model_type}"}, status=400) # Create list of all models all_models = [] for item in cache.raw_data: sha256 = item.get('sha256') usage_info = type_usage_data.get(sha256, {}) if sha256 else {} usage_count = usage_info.get('total', 0) if isinstance(usage_info, dict) else 0 all_models.append({ 'name': item.get('model_name', 'Unknown'), 'usage_count': usage_count, 'base_model': item.get('base_model', 'Unknown'), 'preview_url': config.get_preview_static_url(item.get('preview_url', '')), 'folder': item.get('folder', '') }) # Sort the models reverse = (sort_order == 'desc') all_models.sort(key=lambda x: (x['usage_count'], x['name'].lower()), reverse=reverse) if not reverse: # If asc, sort by usage_count ascending, but keep name ascending all_models.sort(key=lambda x: (x['usage_count'], x['name'].lower())) else: all_models.sort(key=lambda x: (-x['usage_count'], x['name'].lower())) # Slice for pagination paginated_models = all_models[offset:offset + limit] return web.json_response({ 'success': True, 'data': { 'items': paginated_models, 'total': len(all_models), 'type': model_type } }) except Exception as e: logger.error(f"Error getting model usage list: {e}", exc_info=True) return web.json_response({ 'success': False, 'error': str(e) }, status=500) async def get_base_model_distribution(self, request: web.Request) -> web.Response: """Get base model distribution statistics""" try: await self.init_services() # Get model data lora_cache = await self.lora_scanner.get_cached_data() checkpoint_cache = await self.checkpoint_scanner.get_cached_data() embedding_cache = await self.embedding_scanner.get_cached_data() # Count by base model lora_base_models = Counter(lora.get('base_model', 'Unknown') for lora in lora_cache.raw_data) checkpoint_base_models = Counter(cp.get('base_model', 'Unknown') for cp in checkpoint_cache.raw_data) embedding_base_models = Counter(emb.get('base_model', 'Unknown') for emb in embedding_cache.raw_data) return web.json_response({ 'success': True, 'data': { 'loras': dict(lora_base_models), 'checkpoints': dict(checkpoint_base_models), 'embeddings': dict(embedding_base_models) } }) except Exception as e: logger.error(f"Error getting base model distribution: {e}", exc_info=True) return web.json_response({ 'success': False, 'error': str(e) }, status=500) async def get_tag_analytics(self, request: web.Request) -> web.Response: """Get tag usage analytics""" try: await self.init_services() # Get model data lora_cache = await self.lora_scanner.get_cached_data() checkpoint_cache = await self.checkpoint_scanner.get_cached_data() embedding_cache = await self.embedding_scanner.get_cached_data() # Count tag frequencies all_tags = [] for lora in lora_cache.raw_data: all_tags.extend(lora.get('tags', [])) for cp in checkpoint_cache.raw_data: all_tags.extend(cp.get('tags', [])) for emb in embedding_cache.raw_data: all_tags.extend(emb.get('tags', [])) tag_counts = Counter(all_tags) # Get top 50 tags top_tags = [{'tag': tag, 'count': count} for tag, count in tag_counts.most_common(50)] return web.json_response({ 'success': True, 'data': { 'top_tags': top_tags, 'total_unique_tags': len(tag_counts) } }) except Exception as e: logger.error(f"Error getting tag analytics: {e}", exc_info=True) return web.json_response({ 'success': False, 'error': str(e) }, status=500) async def get_storage_analytics(self, request: web.Request) -> web.Response: """Get storage usage analytics""" try: await self.init_services() # Get usage statistics usage_data = await self.usage_stats.get_stats() # Get model data lora_cache = await self.lora_scanner.get_cached_data() checkpoint_cache = await self.checkpoint_scanner.get_cached_data() embedding_cache = await self.embedding_scanner.get_cached_data() # Create models with usage data lora_storage = [] for lora in lora_cache.raw_data: usage_count = 0 if lora['sha256'] in usage_data.get('loras', {}): usage_count = usage_data['loras'][lora['sha256']].get('total', 0) lora_storage.append({ 'name': lora['model_name'], 'size': lora.get('size', 0), 'usage_count': usage_count, 'folder': lora.get('folder', ''), 'base_model': lora.get('base_model', 'Unknown') }) checkpoint_storage = [] for cp in checkpoint_cache.raw_data: usage_count = 0 if cp['sha256'] in usage_data.get('checkpoints', {}): usage_count = usage_data['checkpoints'][cp['sha256']].get('total', 0) checkpoint_storage.append({ 'name': cp['model_name'], 'size': cp.get('size', 0), 'usage_count': usage_count, 'folder': cp.get('folder', ''), 'base_model': cp.get('base_model', 'Unknown') }) embedding_storage = [] for emb in embedding_cache.raw_data: usage_count = 0 if emb['sha256'] in usage_data.get('embeddings', {}): usage_count = usage_data['embeddings'][emb['sha256']].get('total', 0) embedding_storage.append({ 'name': emb['model_name'], 'size': emb.get('size', 0), 'usage_count': usage_count, 'folder': emb.get('folder', ''), 'base_model': emb.get('base_model', 'Unknown') }) # Sort by size lora_storage.sort(key=lambda x: x['size'], reverse=True) checkpoint_storage.sort(key=lambda x: x['size'], reverse=True) embedding_storage.sort(key=lambda x: x['size'], reverse=True) return web.json_response({ 'success': True, 'data': { 'loras': lora_storage[:20], # Top 20 by size 'checkpoints': checkpoint_storage[:20], 'embeddings': embedding_storage[:20] } }) except Exception as e: logger.error(f"Error getting storage analytics: {e}", exc_info=True) return web.json_response({ 'success': False, 'error': str(e) }, status=500) async def get_insights(self, request: web.Request) -> web.Response: """Get smart insights about the collection""" try: await self.init_services() # Get usage statistics usage_data = await self.usage_stats.get_stats() # Get model data lora_cache = await self.lora_scanner.get_cached_data() checkpoint_cache = await self.checkpoint_scanner.get_cached_data() embedding_cache = await self.embedding_scanner.get_cached_data() insights = [] # Calculate unused models unused_loras = self._count_unused_models(lora_cache.raw_data, usage_data.get('loras', {})) unused_checkpoints = self._count_unused_models(checkpoint_cache.raw_data, usage_data.get('checkpoints', {})) unused_embeddings = self._count_unused_models(embedding_cache.raw_data, usage_data.get('embeddings', {})) total_loras = len(lora_cache.raw_data) total_checkpoints = len(checkpoint_cache.raw_data) total_embeddings = len(embedding_cache.raw_data) if total_loras > 0: unused_lora_percent = (unused_loras / total_loras) * 100 if unused_lora_percent > 50: insights.append({ 'type': 'warning', 'key': 'insights.unusedLoras.high', 'params': { 'percent': f'{unused_lora_percent:.1f}', 'count': str(unused_loras), 'total': str(total_loras) } }) if total_checkpoints > 0: unused_checkpoint_percent = (unused_checkpoints / total_checkpoints) * 100 if unused_checkpoint_percent > 30: insights.append({ 'type': 'warning', 'key': 'insights.unusedCheckpoints.detected', 'params': { 'percent': f'{unused_checkpoint_percent:.1f}', 'count': str(unused_checkpoints), 'total': str(total_checkpoints) } }) if total_embeddings > 0: unused_embedding_percent = (unused_embeddings / total_embeddings) * 100 if unused_embedding_percent > 50: insights.append({ 'type': 'warning', 'key': 'insights.unusedEmbeddings.high', 'params': { 'percent': f'{unused_embedding_percent:.1f}', 'count': str(unused_embeddings), 'total': str(total_embeddings) } }) # Storage insights total_size = sum(lora.get('size', 0) for lora in lora_cache.raw_data) + \ sum(cp.get('size', 0) for cp in checkpoint_cache.raw_data) + \ sum(emb.get('size', 0) for emb in embedding_cache.raw_data) if total_size > 100 * 1024 * 1024 * 1024: # 100GB insights.append({ 'type': 'info', 'key': 'insights.collection.large', 'params': { 'size': self._format_size(total_size) } }) # Recent activity insight if usage_data.get('total_executions', 0) > 100: insights.append({ 'type': 'success', 'key': 'insights.activity.active', 'params': { 'count': str(usage_data['total_executions']) } }) return web.json_response({ 'success': True, 'data': { 'insights': insights } }) except Exception as e: logger.error(f"Error getting insights: {e}", exc_info=True) return web.json_response({ 'success': False, 'error': str(e) }, status=500) def _count_unused_models(self, models: List[Dict], usage_data: Dict) -> int: """Count models that have never been used""" used_hashes = set(usage_data.keys()) unused_count = 0 for model in models: if model.get('sha256') not in used_hashes: unused_count += 1 return unused_count def _get_top_used_models(self, usage_data: Dict, model_map: Dict, limit: int) -> List[Dict]: """Get top used models with their metadata""" sorted_usage = sorted(usage_data.items(), key=lambda x: x[1].get('total', 0), reverse=True) top_models = [] for sha256, usage_info in sorted_usage[:limit]: if sha256 in model_map: model = model_map[sha256] top_models.append({ 'name': model['model_name'], 'usage_count': usage_info.get('total', 0), 'base_model': model.get('base_model', 'Unknown'), 'preview_url': config.get_preview_static_url(model.get('preview_url', '')), 'folder': model.get('folder', '') }) return top_models def _get_usage_timeline(self, usage_data: Dict, days: int) -> List[Dict]: """Get usage timeline for the past N days""" timeline = [] today = datetime.now() for i in range(days): date = today - timedelta(days=i) date_str = date.strftime('%Y-%m-%d') lora_usage = 0 checkpoint_usage = 0 embedding_usage = 0 # Count usage for this date for model_usage in usage_data.get('loras', {}).values(): if isinstance(model_usage, dict) and 'history' in model_usage: lora_usage += model_usage['history'].get(date_str, 0) for model_usage in usage_data.get('checkpoints', {}).values(): if isinstance(model_usage, dict) and 'history' in model_usage: checkpoint_usage += model_usage['history'].get(date_str, 0) for model_usage in usage_data.get('embeddings', {}).values(): if isinstance(model_usage, dict) and 'history' in model_usage: embedding_usage += model_usage['history'].get(date_str, 0) timeline.append({ 'date': date_str, 'lora_usage': lora_usage, 'checkpoint_usage': checkpoint_usage, 'embedding_usage': embedding_usage, 'total_usage': lora_usage + checkpoint_usage + embedding_usage }) return list(reversed(timeline)) # Oldest to newest def _format_size(self, size_bytes: int) -> str: """Format file size in human readable format""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size_bytes < 1024.0: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.1f} PB" def setup_routes(self, app: web.Application): """Register routes with the application""" # Add an app startup handler to initialize services app.on_startup.append(self._on_startup) # Register page route app.router.add_get('/statistics', self.handle_stats_page) # Register API routes app.router.add_get('/api/lm/stats/collection-overview', self.get_collection_overview) app.router.add_get('/api/lm/stats/usage-analytics', self.get_usage_analytics) app.router.add_get('/api/lm/stats/model-usage-list', self.get_model_usage_list) app.router.add_get('/api/lm/stats/base-model-distribution', self.get_base_model_distribution) app.router.add_get('/api/lm/stats/tag-analytics', self.get_tag_analytics) app.router.add_get('/api/lm/stats/storage-analytics', self.get_storage_analytics) app.router.add_get('/api/lm/stats/insights', self.get_insights) async def _on_startup(self, app): """Initialize services when the app starts""" await self.init_services()