Refactor download logic to use unified downloader service

- Introduced a new `Downloader` class to centralize HTTP/HTTPS download management.
- Replaced direct `aiohttp` session handling with the unified downloader in `MetadataArchiveManager`, `DownloadManager`, and `ExampleImagesProcessor`.
- Added support for resumable downloads, progress tracking, and error handling in the new downloader.
- Updated methods to utilize the downloader's capabilities for downloading files and images, improving code maintainability and readability.
This commit is contained in:
Will Miao
2025-09-09 10:34:14 +08:00
parent 821827a375
commit 14721c265f
6 changed files with 777 additions and 514 deletions

View File

@@ -3,13 +3,13 @@ import os
import asyncio
import json
import time
import aiohttp
from aiohttp import web
from ..services.service_registry import ServiceRegistry
from ..utils.metadata_manager import MetadataManager
from .example_images_processor import ExampleImagesProcessor
from .example_images_metadata import MetadataUpdater
from ..services.websocket_manager import ws_manager # Add this import at the top
from ..services.downloader import get_downloader
logger = logging.getLogger(__name__)
@@ -199,19 +199,8 @@ class DownloadManager:
"""Download example images for all models"""
global is_downloading, download_progress
# Create independent download session
connector = aiohttp.TCPConnector(
ssl=True,
limit=3,
force_close=False,
enable_cleanup_closed=True
)
timeout = aiohttp.ClientTimeout(total=None, connect=60, sock_read=60)
independent_session = aiohttp.ClientSession(
connector=connector,
trust_env=True,
timeout=timeout
)
# Get unified downloader
downloader = await get_downloader()
try:
# Get scanners
@@ -246,7 +235,7 @@ class DownloadManager:
# Main logic for processing model is here, but actual operations are delegated to other classes
was_remote_download = await DownloadManager._process_model(
scanner_type, model, scanner,
output_dir, optimize, independent_session
output_dir, optimize, downloader
)
# Update progress
@@ -270,12 +259,6 @@ class DownloadManager:
download_progress['end_time'] = time.time()
finally:
# Close the independent session
try:
await independent_session.close()
except Exception as e:
logger.error(f"Error closing download session: {e}")
# Save final progress to file
try:
DownloadManager._save_progress(output_dir)
@@ -286,7 +269,7 @@ class DownloadManager:
is_downloading = False
@staticmethod
async def _process_model(scanner_type, model, scanner, output_dir, optimize, independent_session):
async def _process_model(scanner_type, model, scanner, output_dir, optimize, downloader):
"""Process a single model download"""
global download_progress
@@ -347,7 +330,7 @@ class DownloadManager:
images = model.get('civitai', {}).get('images', [])
success, is_stale = await ExampleImagesProcessor.download_model_images(
model_hash, model_name, images, model_dir, optimize, independent_session
model_hash, model_name, images, model_dir, optimize, downloader
)
# If metadata is stale, try to refresh it
@@ -365,7 +348,7 @@ class DownloadManager:
# Retry download with updated metadata
updated_images = updated_model.get('civitai', {}).get('images', [])
success, _ = await ExampleImagesProcessor.download_model_images(
model_hash, model_name, updated_images, model_dir, optimize, independent_session
model_hash, model_name, updated_images, model_dir, optimize, downloader
)
download_progress['refreshed_models'].add(model_hash)
@@ -529,19 +512,8 @@ class DownloadManager:
"""Download example images for specific models only - synchronous version"""
global download_progress
# Create independent download session
connector = aiohttp.TCPConnector(
ssl=True,
limit=3,
force_close=False,
enable_cleanup_closed=True
)
timeout = aiohttp.ClientTimeout(total=None, connect=60, sock_read=60)
independent_session = aiohttp.ClientSession(
connector=connector,
trust_env=True,
timeout=timeout
)
# Get unified downloader
downloader = await get_downloader()
try:
# Get scanners
@@ -586,7 +558,7 @@ class DownloadManager:
# Force process this model regardless of previous status
was_successful = await DownloadManager._process_specific_model(
scanner_type, model, scanner,
output_dir, optimize, independent_session
output_dir, optimize, downloader
)
if was_successful:
@@ -650,14 +622,11 @@ class DownloadManager:
raise
finally:
# Close the independent session
try:
await independent_session.close()
except Exception as e:
logger.error(f"Error closing download session: {e}")
# No need to close any sessions since we use the global downloader
pass
@staticmethod
async def _process_specific_model(scanner_type, model, scanner, output_dir, optimize, independent_session):
async def _process_specific_model(scanner_type, model, scanner, output_dir, optimize, downloader):
"""Process a specific model for forced download, ignoring previous download status"""
global download_progress
@@ -701,7 +670,7 @@ class DownloadManager:
images = model.get('civitai', {}).get('images', [])
success, is_stale, failed_images = await ExampleImagesProcessor.download_model_images_with_tracking(
model_hash, model_name, images, model_dir, optimize, independent_session
model_hash, model_name, images, model_dir, optimize, downloader
)
# If metadata is stale, try to refresh it
@@ -719,7 +688,7 @@ class DownloadManager:
# Retry download with updated metadata
updated_images = updated_model.get('civitai', {}).get('images', [])
success, _, additional_failed_images = await ExampleImagesProcessor.download_model_images_with_tracking(
model_hash, model_name, updated_images, model_dir, optimize, independent_session
model_hash, model_name, updated_images, model_dir, optimize, downloader
)
# Combine failed images from both attempts

View File

@@ -35,7 +35,7 @@ class ExampleImagesProcessor:
return image_url
@staticmethod
async def download_model_images(model_hash, model_name, model_images, model_dir, optimize, independent_session):
async def download_model_images(model_hash, model_name, model_images, model_dir, optimize, downloader):
"""Download images for a single model
Returns:
@@ -78,23 +78,25 @@ class ExampleImagesProcessor:
try:
logger.debug(f"Downloading {save_filename} for {model_name}")
# Download directly using the independent session
async with independent_session.get(image_url, timeout=60) as response:
if response.status == 200:
with open(save_path, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
if chunk:
f.write(chunk)
elif response.status == 404:
error_msg = f"Failed to download file: {image_url}, status code: 404 - Model metadata might be stale"
logger.warning(error_msg)
model_success = False # Mark the model as failed due to 404 error
# Return early to trigger metadata refresh attempt
return False, True # (success, is_metadata_stale)
else:
error_msg = f"Failed to download file: {image_url}, status code: {response.status}"
logger.warning(error_msg)
model_success = False # Mark the model as failed
# Download using the unified downloader
success, content = await downloader.download_to_memory(
image_url,
use_auth=False # Example images don't need auth
)
if success:
with open(save_path, 'wb') as f:
f.write(content)
elif "404" in str(content):
error_msg = f"Failed to download file: {image_url}, status code: 404 - Model metadata might be stale"
logger.warning(error_msg)
model_success = False # Mark the model as failed due to 404 error
# Return early to trigger metadata refresh attempt
return False, True # (success, is_metadata_stale)
else:
error_msg = f"Failed to download file: {image_url}, error: {content}"
logger.warning(error_msg)
model_success = False # Mark the model as failed
except Exception as e:
error_msg = f"Error downloading file {image_url}: {str(e)}"
logger.error(error_msg)
@@ -103,7 +105,7 @@ class ExampleImagesProcessor:
return model_success, False # (success, is_metadata_stale)
@staticmethod
async def download_model_images_with_tracking(model_hash, model_name, model_images, model_dir, optimize, independent_session):
async def download_model_images_with_tracking(model_hash, model_name, model_images, model_dir, optimize, downloader):
"""Download images for a single model with tracking of failed image URLs
Returns:
@@ -147,25 +149,27 @@ class ExampleImagesProcessor:
try:
logger.debug(f"Downloading {save_filename} for {model_name}")
# Download directly using the independent session
async with independent_session.get(image_url, timeout=60) as response:
if response.status == 200:
with open(save_path, 'wb') as f:
async for chunk in response.content.iter_chunked(8192):
if chunk:
f.write(chunk)
elif response.status == 404:
error_msg = f"Failed to download file: {image_url}, status code: 404 - Model metadata might be stale"
logger.warning(error_msg)
model_success = False # Mark the model as failed due to 404 error
failed_images.append(image_url) # Track failed URL
# Return early to trigger metadata refresh attempt
return False, True, failed_images # (success, is_metadata_stale, failed_images)
else:
error_msg = f"Failed to download file: {image_url}, status code: {response.status}"
logger.warning(error_msg)
model_success = False # Mark the model as failed
failed_images.append(image_url) # Track failed URL
# Download using the unified downloader
success, content = await downloader.download_to_memory(
image_url,
use_auth=False # Example images don't need auth
)
if success:
with open(save_path, 'wb') as f:
f.write(content)
elif "404" in str(content):
error_msg = f"Failed to download file: {image_url}, status code: 404 - Model metadata might be stale"
logger.warning(error_msg)
model_success = False # Mark the model as failed due to 404 error
failed_images.append(image_url) # Track failed URL
# Return early to trigger metadata refresh attempt
return False, True, failed_images # (success, is_metadata_stale, failed_images)
else:
error_msg = f"Failed to download file: {image_url}, error: {content}"
logger.warning(error_msg)
model_success = False # Mark the model as failed
failed_images.append(image_url) # Track failed URL
except Exception as e:
error_msg = f"Error downloading file {image_url}: {str(e)}"
logger.error(error_msg)