feat(metadata): implement metadata archive management and update settings for metadata providers

This commit is contained in:
Will Miao
2025-09-08 13:17:16 +08:00
parent 9ba3e2c204
commit 821827a375
11 changed files with 659 additions and 38 deletions

View File

@@ -0,0 +1,150 @@
import zipfile
import aiohttp
import logging
import asyncio
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
class MetadataArchiveManager:
"""Manages downloading and extracting Civitai metadata archive database"""
DOWNLOAD_URLS = [
"https://github.com/willmiao/civitai-metadata-archive-db/releases/download/db-2025-08-08/civitai.zip",
"https://huggingface.co/datasets/willmiao/civitai-metadata-archive-db/blob/main/civitai.zip"
]
def __init__(self, base_path: str):
"""Initialize with base path where files will be stored"""
self.base_path = Path(base_path)
self.civitai_folder = self.base_path / "civitai"
self.archive_path = self.base_path / "civitai.zip"
self.db_path = self.civitai_folder / "civitai.sqlite"
def is_database_available(self) -> bool:
"""Check if the SQLite database is available and valid"""
return self.db_path.exists() and self.db_path.stat().st_size > 0
def get_database_path(self) -> Optional[str]:
"""Get the path to the SQLite database if available"""
if self.is_database_available():
return str(self.db_path)
return None
async def download_and_extract_database(self, progress_callback=None) -> bool:
"""Download and extract the metadata archive database
Args:
progress_callback: Optional callback function to report progress
Returns:
bool: True if successful, False otherwise
"""
try:
# Create directories if they don't exist
self.base_path.mkdir(parents=True, exist_ok=True)
self.civitai_folder.mkdir(parents=True, exist_ok=True)
# Download the archive
if not await self._download_archive(progress_callback):
return False
# Extract the archive
if not await self._extract_archive(progress_callback):
return False
# Clean up the archive file
if self.archive_path.exists():
self.archive_path.unlink()
logger.info(f"Successfully downloaded and extracted metadata database to {self.db_path}")
return True
except Exception as e:
logger.error(f"Error downloading and extracting metadata database: {e}", exc_info=True)
return False
async def _download_archive(self, progress_callback=None) -> bool:
"""Download the zip archive from one of the available URLs"""
for url in self.DOWNLOAD_URLS:
try:
logger.info(f"Attempting to download from {url}")
if progress_callback:
progress_callback("download", f"Downloading from {url}")
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(self.archive_path, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
f.write(chunk)
downloaded += len(chunk)
if progress_callback and total_size > 0:
percentage = (downloaded / total_size) * 100
progress_callback("download", f"Downloaded {percentage:.1f}%")
logger.info(f"Successfully downloaded archive from {url}")
return True
else:
logger.warning(f"Failed to download from {url}: HTTP {response.status}")
continue
except Exception as e:
logger.warning(f"Error downloading from {url}: {e}")
continue
logger.error("Failed to download archive from any URL")
return False
async def _extract_archive(self, progress_callback=None) -> bool:
"""Extract the zip archive to the civitai folder"""
try:
if progress_callback:
progress_callback("extract", "Extracting archive...")
# Run extraction in thread pool to avoid blocking
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._extract_zip_sync)
if progress_callback:
progress_callback("extract", "Extraction completed")
return True
except Exception as e:
logger.error(f"Error extracting archive: {e}", exc_info=True)
return False
def _extract_zip_sync(self):
"""Synchronous zip extraction (runs in thread pool)"""
with zipfile.ZipFile(self.archive_path, 'r') as archive:
archive.extractall(path=self.base_path)
async def remove_database(self) -> bool:
"""Remove the metadata database and folder"""
try:
if self.civitai_folder.exists():
# Remove all files in the civitai folder
for file_path in self.civitai_folder.iterdir():
if file_path.is_file():
file_path.unlink()
# Remove the folder itself
self.civitai_folder.rmdir()
# Also remove the archive file if it exists
if self.archive_path.exists():
self.archive_path.unlink()
logger.info("Successfully removed metadata database")
return True
except Exception as e:
logger.error(f"Error removing metadata database: {e}", exc_info=True)
return False

View File

@@ -1,28 +1,97 @@
import os
import logging
from .model_metadata_provider import ModelMetadataProviderManager, SQLiteModelMetadataProvider
from .model_metadata_provider import (
ModelMetadataProviderManager,
SQLiteModelMetadataProvider,
CivitaiModelMetadataProvider,
FallbackMetadataProvider
)
from .settings_manager import settings
from .metadata_archive_manager import MetadataArchiveManager
from .service_registry import ServiceRegistry
logger = logging.getLogger(__name__)
async def initialize_metadata_providers():
"""Initialize and configure all metadata providers"""
"""Initialize and configure all metadata providers based on settings"""
provider_manager = await ModelMetadataProviderManager.get_instance()
# Use hardcoded SQLite DB path if not set in settings
db_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
'civitai', 'civitai.sqlite'
)
if db_path and os.path.exists(db_path):
try:
sqlite_provider = SQLiteModelMetadataProvider(db_path)
provider_manager.register_provider('sqlite', sqlite_provider)
logger.info(f"SQLite metadata provider registered with database: {db_path}")
except Exception as e:
logger.error(f"Failed to initialize SQLite metadata provider: {e}")
# Get settings
enable_archive_db = settings.get('enable_metadata_archive_db', False)
priority = settings.get('metadata_provider_priority', 'archive_db')
providers = []
# Initialize archive database provider if enabled
if enable_archive_db:
# Initialize archive manager
base_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
archive_manager = MetadataArchiveManager(base_path)
db_path = archive_manager.get_database_path()
if db_path:
try:
sqlite_provider = SQLiteModelMetadataProvider(db_path)
provider_manager.register_provider('sqlite', sqlite_provider)
providers.append(('sqlite', sqlite_provider))
logger.info(f"SQLite metadata provider registered with database: {db_path}")
except Exception as e:
logger.error(f"Failed to initialize SQLite metadata provider: {e}")
else:
logger.warning("Metadata archive database is enabled but not available")
# Initialize Civitai API provider
try:
civitai_client = await ServiceRegistry.get_civitai_client()
civitai_provider = CivitaiModelMetadataProvider(civitai_client)
provider_manager.register_provider('civitai_api', civitai_provider)
providers.append(('civitai_api', civitai_provider))
logger.info("Civitai API metadata provider registered")
except Exception as e:
logger.error(f"Failed to initialize Civitai API metadata provider: {e}")
# Set up fallback provider based on priority
if len(providers) > 1:
# Order providers based on priority setting
if priority == 'archive_db':
# Archive DB first, then Civitai API
ordered_providers = [p[1] for p in providers if p[0] == 'sqlite'] + [p[1] for p in providers if p[0] == 'civitai_api']
else:
# Civitai API first, then Archive DB
ordered_providers = [p[1] for p in providers if p[0] == 'civitai_api'] + [p[1] for p in providers if p[0] == 'sqlite']
if ordered_providers:
fallback_provider = FallbackMetadataProvider(ordered_providers)
provider_manager.register_provider('fallback', fallback_provider, is_default=True)
logger.info(f"Fallback metadata provider registered with priority: {priority}")
elif len(providers) == 1:
# Only one provider available, set it as default
provider_name, provider = providers[0]
provider_manager.register_provider(provider_name, provider, is_default=True)
logger.info(f"Single metadata provider registered as default: {provider_name}")
else:
logger.warning("No metadata providers available")
return provider_manager
async def update_metadata_provider_priority():
"""Update metadata provider priority based on current settings"""
provider_manager = await ModelMetadataProviderManager.get_instance()
# Get current settings
enable_archive_db = settings.get('enable_metadata_archive_db', False)
priority = settings.get('metadata_provider_priority', 'archive_db')
# Rebuild providers with new priority
await initialize_metadata_providers()
logger.info(f"Updated metadata provider priority to: {priority}")
async def get_metadata_archive_manager():
"""Get metadata archive manager instance"""
base_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
return MetadataArchiveManager(base_path)
async def get_metadata_provider(provider_name: str = None):
"""Get a specific metadata provider or default provider"""
provider_manager = await ModelMetadataProviderManager.get_instance()

View File

@@ -297,7 +297,8 @@ class FallbackMetadataProvider(ModelMetadataProvider):
result = await provider.get_model_versions(model_id)
if result:
return result
except Exception:
except Exception as e:
logger.debug(f"Provider failed for get_model_versions: {e}")
continue
return None
@@ -307,27 +308,30 @@ class FallbackMetadataProvider(ModelMetadataProvider):
result = await provider.get_model_version(model_id, version_id)
if result:
return result
except Exception:
except Exception as e:
logger.debug(f"Provider failed for get_model_version: {e}")
continue
return None
async def get_model_version_info(self, version_id: str) -> Tuple[Optional[Dict], Optional[str]]:
for provider in self.providers:
try:
result, err = await provider.get_model_version_info(version_id)
result, error = await provider.get_model_version_info(version_id)
if result:
return result, err
except Exception:
return result, error
except Exception as e:
logger.debug(f"Provider failed for get_model_version_info: {e}")
continue
return None, "Not found in any provider"
return None, "No provider could retrieve the data"
async def get_model_metadata(self, model_id: str) -> Tuple[Optional[Dict], int]:
for provider in self.providers:
try:
result, code = await provider.get_model_metadata(model_id)
result, status = await provider.get_model_metadata(model_id)
if result:
return result, code
except Exception:
return result, status
except Exception as e:
logger.debug(f"Provider failed for get_model_metadata: {e}")
continue
return None, 404

View File

@@ -81,7 +81,9 @@ class SettingsManager:
return {
"civitai_api_key": "",
"show_only_sfw": False,
"language": "en" # 添加默认语言设置
"language": "en", # 添加默认语言设置
"enable_metadata_archive_db": False, # Enable metadata archive database
"metadata_provider_priority": "archive_db" # Default priority: 'archive_db' or 'civitai_api'
}
def get(self, key: str, default: Any = None) -> Any: