feat(cache): add cache health monitoring and validation system, see #730

- Add cache entry validator service for data integrity checks
- Add cache health monitor service for periodic health checks
- Enhance model cache and scanner with validation support
- Update websocket manager for health status broadcasting
- Add initialization banner service for cache health alerts
- Add comprehensive test coverage for new services
- Update translations across all locales
- Refactor sync translation keys script
This commit is contained in:
Will Miao
2026-02-02 08:26:38 +08:00
parent 68cf381b50
commit 778ad8abd2
21 changed files with 1719 additions and 10 deletions

View File

@@ -0,0 +1,259 @@
"""
Cache Entry Validator
Validates and repairs cache entries to prevent runtime errors from
missing or invalid critical fields.
"""
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
import logging
import os
logger = logging.getLogger(__name__)
@dataclass
class ValidationResult:
"""Result of validating a single cache entry."""
is_valid: bool
repaired: bool
errors: List[str] = field(default_factory=list)
entry: Optional[Dict[str, Any]] = None
class CacheEntryValidator:
"""
Validates and repairs cache entry core fields.
Critical fields that cause runtime errors when missing:
- file_path: KeyError in multiple locations
- sha256: KeyError/AttributeError in hash operations
Medium severity fields that may cause sorting/display issues:
- size: KeyError during sorting
- modified: KeyError during sorting
- model_name: AttributeError on .lower() calls
Low severity fields:
- tags: KeyError/TypeError in recipe operations
"""
# Field definitions: (default_value, is_required)
CORE_FIELDS: Dict[str, Tuple[Any, bool]] = {
'file_path': ('', True),
'sha256': ('', True),
'file_name': ('', False),
'model_name': ('', False),
'folder': ('', False),
'size': (0, False),
'modified': (0.0, False),
'tags': ([], False),
'preview_url': ('', False),
'base_model': ('', False),
'from_civitai': (True, False),
'favorite': (False, False),
'exclude': (False, False),
'db_checked': (False, False),
'preview_nsfw_level': (0, False),
'notes': ('', False),
'usage_tips': ('', False),
}
@classmethod
def validate(cls, entry: Dict[str, Any], *, auto_repair: bool = True) -> ValidationResult:
"""
Validate a single cache entry.
Args:
entry: The cache entry dictionary to validate
auto_repair: If True, attempt to repair missing/invalid fields
Returns:
ValidationResult with validation status and optionally repaired entry
"""
if entry is None:
return ValidationResult(
is_valid=False,
repaired=False,
errors=['Entry is None'],
entry=None
)
if not isinstance(entry, dict):
return ValidationResult(
is_valid=False,
repaired=False,
errors=[f'Entry is not a dict: {type(entry).__name__}'],
entry=None
)
errors: List[str] = []
repaired = False
working_entry = dict(entry) if auto_repair else entry
for field_name, (default_value, is_required) in cls.CORE_FIELDS.items():
value = working_entry.get(field_name)
# Check if field is missing or None
if value is None:
if is_required:
errors.append(f"Required field '{field_name}' is missing or None")
if auto_repair:
working_entry[field_name] = cls._get_default_copy(default_value)
repaired = True
continue
# Validate field type and value
field_error = cls._validate_field(field_name, value, default_value)
if field_error:
errors.append(field_error)
if auto_repair:
working_entry[field_name] = cls._get_default_copy(default_value)
repaired = True
# Special validation: file_path must not be empty for required field
file_path = working_entry.get('file_path', '')
if not file_path or (isinstance(file_path, str) and not file_path.strip()):
errors.append("Required field 'file_path' is empty")
# Cannot repair empty file_path - entry is invalid
return ValidationResult(
is_valid=False,
repaired=repaired,
errors=errors,
entry=working_entry if auto_repair else None
)
# Special validation: sha256 must not be empty for required field
sha256 = working_entry.get('sha256', '')
if not sha256 or (isinstance(sha256, str) and not sha256.strip()):
errors.append("Required field 'sha256' is empty")
# Cannot repair empty sha256 - entry is invalid
return ValidationResult(
is_valid=False,
repaired=repaired,
errors=errors,
entry=working_entry if auto_repair else None
)
# Normalize sha256 to lowercase if needed
if isinstance(sha256, str):
normalized_sha = sha256.lower().strip()
if normalized_sha != sha256:
working_entry['sha256'] = normalized_sha
repaired = True
# Determine if entry is valid
# Entry is valid if no critical required field errors remain after repair
# Critical fields are file_path and sha256
CRITICAL_REQUIRED_FIELDS = {'file_path', 'sha256'}
has_critical_errors = any(
"Required field" in error and
any(f"'{field}'" in error for field in CRITICAL_REQUIRED_FIELDS)
for error in errors
)
is_valid = not has_critical_errors
return ValidationResult(
is_valid=is_valid,
repaired=repaired,
errors=errors,
entry=working_entry if auto_repair else entry
)
@classmethod
def validate_batch(
cls,
entries: List[Dict[str, Any]],
*,
auto_repair: bool = True
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Validate a batch of cache entries.
Args:
entries: List of cache entry dictionaries to validate
auto_repair: If True, attempt to repair missing/invalid fields
Returns:
Tuple of (valid_entries, invalid_entries)
"""
if not entries:
return [], []
valid_entries: List[Dict[str, Any]] = []
invalid_entries: List[Dict[str, Any]] = []
for entry in entries:
result = cls.validate(entry, auto_repair=auto_repair)
if result.is_valid:
# Use repaired entry if available, otherwise original
valid_entries.append(result.entry if result.entry else entry)
else:
invalid_entries.append(entry)
# Log invalid entries for debugging
file_path = entry.get('file_path', '<unknown>') if isinstance(entry, dict) else '<not a dict>'
logger.warning(
f"Invalid cache entry for '{file_path}': {', '.join(result.errors)}"
)
return valid_entries, invalid_entries
@classmethod
def _validate_field(cls, field_name: str, value: Any, default_value: Any) -> Optional[str]:
"""
Validate a specific field value.
Returns an error message if invalid, None if valid.
"""
expected_type = type(default_value)
# Special handling for numeric types
if expected_type == int:
if not isinstance(value, (int, float)):
return f"Field '{field_name}' should be numeric, got {type(value).__name__}"
elif expected_type == float:
if not isinstance(value, (int, float)):
return f"Field '{field_name}' should be numeric, got {type(value).__name__}"
elif expected_type == bool:
# Be lenient with boolean fields - accept truthy/falsy values
pass
elif expected_type == str:
if not isinstance(value, str):
return f"Field '{field_name}' should be string, got {type(value).__name__}"
elif expected_type == list:
if not isinstance(value, (list, tuple)):
return f"Field '{field_name}' should be list, got {type(value).__name__}"
return None
@classmethod
def _get_default_copy(cls, default_value: Any) -> Any:
"""Get a copy of the default value to avoid shared mutable state."""
if isinstance(default_value, list):
return list(default_value)
if isinstance(default_value, dict):
return dict(default_value)
return default_value
@classmethod
def get_file_path_safe(cls, entry: Dict[str, Any], default: str = '') -> str:
"""Safely get file_path from an entry."""
if not isinstance(entry, dict):
return default
value = entry.get('file_path')
if isinstance(value, str):
return value
return default
@classmethod
def get_sha256_safe(cls, entry: Dict[str, Any], default: str = '') -> str:
"""Safely get sha256 from an entry."""
if not isinstance(entry, dict):
return default
value = entry.get('sha256')
if isinstance(value, str):
return value.lower()
return default

View File

@@ -0,0 +1,201 @@
"""
Cache Health Monitor
Monitors cache health status and determines when user intervention is needed.
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional
import logging
from .cache_entry_validator import CacheEntryValidator, ValidationResult
logger = logging.getLogger(__name__)
class CacheHealthStatus(Enum):
"""Health status of the cache."""
HEALTHY = "healthy"
DEGRADED = "degraded"
CORRUPTED = "corrupted"
@dataclass
class HealthReport:
"""Report of cache health check."""
status: CacheHealthStatus
total_entries: int
valid_entries: int
invalid_entries: int
repaired_entries: int
invalid_paths: List[str] = field(default_factory=list)
message: str = ""
@property
def corruption_rate(self) -> float:
"""Calculate the percentage of invalid entries."""
if self.total_entries <= 0:
return 0.0
return self.invalid_entries / self.total_entries
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
'status': self.status.value,
'total_entries': self.total_entries,
'valid_entries': self.valid_entries,
'invalid_entries': self.invalid_entries,
'repaired_entries': self.repaired_entries,
'corruption_rate': f"{self.corruption_rate:.1%}",
'invalid_paths': self.invalid_paths[:10], # Limit to first 10
'message': self.message,
}
class CacheHealthMonitor:
"""
Monitors cache health and determines appropriate status.
Thresholds:
- HEALTHY: 0% invalid entries
- DEGRADED: 0-5% invalid entries (auto-repaired, user should rebuild)
- CORRUPTED: >5% invalid entries (significant data loss likely)
"""
# Threshold percentages
DEGRADED_THRESHOLD = 0.01 # 1% - show warning
CORRUPTED_THRESHOLD = 0.05 # 5% - critical warning
def __init__(
self,
*,
degraded_threshold: float = DEGRADED_THRESHOLD,
corrupted_threshold: float = CORRUPTED_THRESHOLD
):
"""
Initialize the health monitor.
Args:
degraded_threshold: Corruption rate threshold for DEGRADED status
corrupted_threshold: Corruption rate threshold for CORRUPTED status
"""
self.degraded_threshold = degraded_threshold
self.corrupted_threshold = corrupted_threshold
def check_health(
self,
entries: List[Dict[str, Any]],
*,
auto_repair: bool = True
) -> HealthReport:
"""
Check the health of cache entries.
Args:
entries: List of cache entry dictionaries to check
auto_repair: If True, attempt to repair entries during validation
Returns:
HealthReport with status and statistics
"""
if not entries:
return HealthReport(
status=CacheHealthStatus.HEALTHY,
total_entries=0,
valid_entries=0,
invalid_entries=0,
repaired_entries=0,
message="Cache is empty"
)
total_entries = len(entries)
valid_entries: List[Dict[str, Any]] = []
invalid_entries: List[Dict[str, Any]] = []
repaired_count = 0
invalid_paths: List[str] = []
for entry in entries:
result = CacheEntryValidator.validate(entry, auto_repair=auto_repair)
if result.is_valid:
valid_entries.append(result.entry if result.entry else entry)
if result.repaired:
repaired_count += 1
else:
invalid_entries.append(entry)
# Extract file path for reporting
file_path = CacheEntryValidator.get_file_path_safe(entry, '<unknown>')
invalid_paths.append(file_path)
invalid_count = len(invalid_entries)
valid_count = len(valid_entries)
# Determine status based on corruption rate
corruption_rate = invalid_count / total_entries if total_entries > 0 else 0.0
if invalid_count == 0:
status = CacheHealthStatus.HEALTHY
message = "Cache is healthy"
elif corruption_rate >= self.corrupted_threshold:
status = CacheHealthStatus.CORRUPTED
message = (
f"Cache is corrupted: {invalid_count} invalid entries "
f"({corruption_rate:.1%}). Rebuild recommended."
)
elif corruption_rate >= self.degraded_threshold or invalid_count > 0:
status = CacheHealthStatus.DEGRADED
message = (
f"Cache has {invalid_count} invalid entries "
f"({corruption_rate:.1%}). Consider rebuilding cache."
)
else:
# This shouldn't happen, but handle gracefully
status = CacheHealthStatus.HEALTHY
message = "Cache is healthy"
# Log the health check result
if status != CacheHealthStatus.HEALTHY:
logger.warning(
f"Cache health check: {status.value} - "
f"{invalid_count}/{total_entries} invalid, "
f"{repaired_count} repaired"
)
if invalid_paths:
logger.debug(f"Invalid entry paths: {invalid_paths[:5]}")
return HealthReport(
status=status,
total_entries=total_entries,
valid_entries=valid_count,
invalid_entries=invalid_count,
repaired_entries=repaired_count,
invalid_paths=invalid_paths,
message=message
)
def should_notify_user(self, report: HealthReport) -> bool:
"""
Determine if the user should be notified about cache health.
Args:
report: The health report to evaluate
Returns:
True if user should be notified
"""
return report.status != CacheHealthStatus.HEALTHY
def get_notification_severity(self, report: HealthReport) -> str:
"""
Get the severity level for user notification.
Args:
report: The health report to evaluate
Returns:
Severity string: 'warning' or 'error'
"""
if report.status == CacheHealthStatus.CORRUPTED:
return 'error'
return 'warning'

View File

@@ -5,7 +5,6 @@ import logging
logger = logging.getLogger(__name__)
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from operator import itemgetter
from natsort import natsorted
# Supported sort modes: (sort_key, order)
@@ -229,17 +228,17 @@ class ModelCache:
reverse=reverse
)
elif sort_key == 'date':
# Sort by modified timestamp
# Sort by modified timestamp (use .get() with default to handle missing fields)
result = sorted(
data,
key=itemgetter('modified'),
key=lambda x: x.get('modified', 0.0),
reverse=reverse
)
elif sort_key == 'size':
# Sort by file size
# Sort by file size (use .get() with default to handle missing fields)
result = sorted(
data,
key=itemgetter('size'),
key=lambda x: x.get('size', 0),
reverse=reverse
)
elif sort_key == 'usage':

View File

@@ -20,6 +20,8 @@ from .service_registry import ServiceRegistry
from .websocket_manager import ws_manager
from .persistent_model_cache import get_persistent_cache
from .settings_manager import get_settings_manager
from .cache_entry_validator import CacheEntryValidator
from .cache_health_monitor import CacheHealthMonitor, CacheHealthStatus
logger = logging.getLogger(__name__)
@@ -468,6 +470,39 @@ class ModelScanner:
for tag in adjusted_item.get('tags') or []:
tags_count[tag] = tags_count.get(tag, 0) + 1
# Validate cache entries and check health
valid_entries, invalid_entries = CacheEntryValidator.validate_batch(
adjusted_raw_data, auto_repair=True
)
if invalid_entries:
monitor = CacheHealthMonitor()
report = monitor.check_health(adjusted_raw_data, auto_repair=True)
if report.status != CacheHealthStatus.HEALTHY:
# Broadcast health warning to frontend
await ws_manager.broadcast_cache_health_warning(report, page_type)
logger.warning(
f"{self.model_type.capitalize()} Scanner: Cache health issue detected - "
f"{report.invalid_entries} invalid entries, {report.repaired_entries} repaired"
)
# Use only valid entries
adjusted_raw_data = valid_entries
# Rebuild tags count from valid entries only
tags_count = {}
for item in adjusted_raw_data:
for tag in item.get('tags') or []:
tags_count[tag] = tags_count.get(tag, 0) + 1
# Remove invalid entries from hash index
for invalid_entry in invalid_entries:
file_path = CacheEntryValidator.get_file_path_safe(invalid_entry)
sha256 = CacheEntryValidator.get_sha256_safe(invalid_entry)
if file_path:
hash_index.remove_by_path(file_path, sha256)
scan_result = CacheBuildResult(
raw_data=adjusted_raw_data,
hash_index=hash_index,
@@ -776,6 +811,18 @@ class ModelScanner:
model_data = self.adjust_cached_entry(dict(model_data))
if not model_data:
continue
# Validate the new entry before adding
validation_result = CacheEntryValidator.validate(
model_data, auto_repair=True
)
if not validation_result.is_valid:
logger.warning(
f"Skipping invalid entry during reconcile: {path}"
)
continue
model_data = validation_result.entry
self._ensure_license_flags(model_data)
# Add to cache
self._cache.raw_data.append(model_data)
@@ -1090,6 +1137,17 @@ class ModelScanner:
processed_files += 1
if result:
# Validate the entry before adding
validation_result = CacheEntryValidator.validate(
result, auto_repair=True
)
if not validation_result.is_valid:
logger.warning(
f"Skipping invalid scan result: {file_path}"
)
continue
result = validation_result.entry
self._ensure_license_flags(result)
raw_data.append(result)

View File

@@ -255,6 +255,42 @@ class WebSocketManager:
self._download_progress.pop(download_id, None)
logger.debug(f"Cleaned up old download progress for {download_id}")
async def broadcast_cache_health_warning(self, report: 'HealthReport', page_type: str = None):
"""
Broadcast cache health warning to frontend.
Args:
report: HealthReport instance from CacheHealthMonitor
page_type: The page type (loras, checkpoints, embeddings)
"""
from .cache_health_monitor import CacheHealthStatus
# Only broadcast if there are issues
if report.status == CacheHealthStatus.HEALTHY:
return
payload = {
'type': 'cache_health_warning',
'status': report.status.value,
'message': report.message,
'pageType': page_type,
'details': {
'total': report.total_entries,
'valid': report.valid_entries,
'invalid': report.invalid_entries,
'repaired': report.repaired_entries,
'corruption_rate': f"{report.corruption_rate:.1%}",
'invalid_paths': report.invalid_paths[:5], # Limit to first 5
}
}
logger.info(
f"Broadcasting cache health warning: {report.status.value} "
f"({report.invalid_entries} invalid entries)"
)
await self.broadcast(payload)
def get_connected_clients_count(self) -> int:
"""Get number of connected clients"""
return len(self._websockets)