diff --git a/py/services/civitai_client.py b/py/services/civitai_client.py index 1613776d..a390ef83 100644 --- a/py/services/civitai_client.py +++ b/py/services/civitai_client.py @@ -49,8 +49,8 @@ class CivitaiClient: enable_cleanup_closed=True ) trust_env = True # Allow using system environment proxy settings - # Configure timeout parameters - increase read timeout for large files - timeout = aiohttp.ClientTimeout(total=None, connect=60, sock_read=120) + # Configure timeout parameters - increase read timeout for large files and remove sock_read timeout + timeout = aiohttp.ClientTimeout(total=None, connect=60, sock_read=None) self._session = aiohttp.ClientSession( connector=connector, trust_env=trust_env, @@ -102,7 +102,7 @@ class CivitaiClient: return headers async def _download_file(self, url: str, save_dir: str, default_filename: str, progress_callback=None) -> Tuple[bool, str]: - """Download file with content-disposition support and progress tracking + """Download file with resumable downloads and retry mechanism Args: url: Download URL @@ -113,75 +113,190 @@ class CivitaiClient: Returns: Tuple[bool, str]: (success, save_path or error message) """ - logger.debug(f"Resolving DNS for: {url}") + max_retries = 5 + retry_count = 0 + base_delay = 2.0 # Base delay for exponential backoff + + # Initial setup session = await self._ensure_fresh_session() - try: - headers = self._get_request_headers() - - # Add Range header to allow resumable downloads - headers['Accept-Encoding'] = 'identity' # Disable compression for better chunked downloads - - logger.debug(f"Starting download from: {url}") - async with session.get(url, headers=headers, allow_redirects=True) as response: - if response.status != 200: - # Handle 401 unauthorized responses - if response.status == 401: + save_path = os.path.join(save_dir, default_filename) + part_path = save_path + '.part' + + # Get existing file size for resume + resume_offset = 0 + if os.path.exists(part_path): + resume_offset = os.path.getsize(part_path) + logger.info(f"Resuming download from offset {resume_offset} bytes") + + total_size = 0 + filename = default_filename + + while retry_count <= max_retries: + try: + headers = self._get_request_headers() + + # Add Range header for resume if we have partial data + if resume_offset > 0: + headers['Range'] = f'bytes={resume_offset}-' + + # Add Range header to allow resumable downloads + headers['Accept-Encoding'] = 'identity' # Disable compression for better chunked downloads + + logger.debug(f"Download attempt {retry_count + 1}/{max_retries + 1} from: {url}") + if resume_offset > 0: + logger.debug(f"Requesting range from byte {resume_offset}") + + async with session.get(url, headers=headers, allow_redirects=True) as response: + # Handle different response codes + if response.status == 200: + # Full content response + if resume_offset > 0: + # Server doesn't support ranges, restart from beginning + logger.warning("Server doesn't support range requests, restarting download") + resume_offset = 0 + if os.path.exists(part_path): + os.remove(part_path) + elif response.status == 206: + # Partial content response (resume successful) + content_range = response.headers.get('Content-Range') + if content_range: + # Parse total size from Content-Range header (e.g., "bytes 1024-2047/2048") + range_parts = content_range.split('/') + if len(range_parts) == 2: + total_size = int(range_parts[1]) + logger.info(f"Successfully resumed download from byte {resume_offset}") + elif response.status == 416: + # Range not satisfiable - file might be complete or corrupted + if os.path.exists(part_path): + part_size = os.path.getsize(part_path) + logger.warning(f"Range not satisfiable. Part file size: {part_size}") + # Try to get actual file size + head_response = await session.head(url, headers=self._get_request_headers()) + if head_response.status == 200: + actual_size = int(head_response.headers.get('content-length', 0)) + if part_size == actual_size: + # File is complete, just rename it + os.rename(part_path, save_path) + if progress_callback: + await progress_callback(100) + return True, save_path + # Remove corrupted part file and restart + os.remove(part_path) + resume_offset = 0 + continue + elif response.status == 401: logger.warning(f"Unauthorized access to resource: {url} (Status 401)") - return False, "Invalid or missing CivitAI API key, or early access restriction." - - # Handle other client errors that might be permission-related - if response.status == 403: + elif response.status == 403: logger.warning(f"Forbidden access to resource: {url} (Status 403)") return False, "Access forbidden: You don't have permission to download this file." + else: + logger.error(f"Download failed for {url} with status {response.status}") + return False, f"Download failed with status {response.status}" + + # Get filename from content-disposition header (only on first attempt) + if retry_count == 0: + content_disposition = response.headers.get('Content-Disposition') + parsed_filename = self._parse_content_disposition(content_disposition) + if parsed_filename: + filename = parsed_filename + # Update paths with correct filename + save_path = os.path.join(save_dir, filename) + new_part_path = save_path + '.part' + # Rename existing part file if filename changed + if part_path != new_part_path and os.path.exists(part_path): + os.rename(part_path, new_part_path) + part_path = new_part_path - # Generic error response for other status codes - logger.error(f"Download failed for {url} with status {response.status}") - return False, f"Download failed with status {response.status}" + # Get total file size for progress calculation (if not set from Content-Range) + if total_size == 0: + total_size = int(response.headers.get('content-length', 0)) + if response.status == 206: + # For partial content, add the offset to get total file size + total_size += resume_offset - # Get filename from content-disposition header - content_disposition = response.headers.get('Content-Disposition') - filename = self._parse_content_disposition(content_disposition) - if not filename: - filename = default_filename - - save_path = os.path.join(save_dir, filename) - - # Get total file size for progress calculation - total_size = int(response.headers.get('content-length', 0)) - current_size = 0 - last_progress_report_time = datetime.now() + current_size = resume_offset + last_progress_report_time = datetime.now() - # Stream download to file with progress updates using larger buffer - loop = asyncio.get_running_loop() - with open(save_path, 'wb') as f: - async for chunk in response.content.iter_chunked(self.chunk_size): - if chunk: - # Run blocking file write in executor - await loop.run_in_executor(None, f.write, chunk) - current_size += len(chunk) - - # Limit progress update frequency to reduce overhead - now = datetime.now() - time_diff = (now - last_progress_report_time).total_seconds() - - if progress_callback and total_size and time_diff >= 1.0: - progress = (current_size / total_size) * 100 - await progress_callback(progress) - last_progress_report_time = now - - # Ensure 100% progress is reported - if progress_callback: - await progress_callback(100) + # Stream download to file with progress updates using larger buffer + loop = asyncio.get_running_loop() + mode = 'ab' if resume_offset > 0 else 'wb' + with open(part_path, mode) as f: + async for chunk in response.content.iter_chunked(self.chunk_size): + if chunk: + # Run blocking file write in executor + await loop.run_in_executor(None, f.write, chunk) + current_size += len(chunk) + + # Limit progress update frequency to reduce overhead + now = datetime.now() + time_diff = (now - last_progress_report_time).total_seconds() + + if progress_callback and total_size and time_diff >= 1.0: + progress = (current_size / total_size) * 100 + await progress_callback(progress) + last_progress_report_time = now + + # Download completed successfully + # Verify file size if total_size was provided + final_size = os.path.getsize(part_path) + if total_size > 0 and final_size != total_size: + logger.warning(f"File size mismatch. Expected: {total_size}, Got: {final_size}") + # Don't treat this as fatal error, rename anyway + + # Atomically rename .part to final file with retries + max_rename_attempts = 5 + rename_attempt = 0 + rename_success = False + + while rename_attempt < max_rename_attempts and not rename_success: + try: + os.rename(part_path, save_path) + rename_success = True + except PermissionError as e: + rename_attempt += 1 + if rename_attempt < max_rename_attempts: + logger.info(f"File still in use, retrying rename in 2 seconds (attempt {rename_attempt}/{max_rename_attempts})") + await asyncio.sleep(2) # Wait before retrying + else: + logger.error(f"Failed to rename file after {max_rename_attempts} attempts: {e}") + return False, f"Failed to finalize download: {str(e)}" + + # Ensure 100% progress is reported + if progress_callback: + await progress_callback(100) - return True, save_path + return True, save_path + + except (aiohttp.ClientError, aiohttp.ClientPayloadError, + aiohttp.ServerDisconnectedError, asyncio.TimeoutError) as e: + retry_count += 1 + logger.warning(f"Network error during download (attempt {retry_count}/{max_retries + 1}): {e}") - except aiohttp.ClientError as e: - logger.error(f"Network error during download: {e}") - return False, f"Network error: {str(e)}" - except Exception as e: - logger.error(f"Download error: {e}") - return False, str(e) + if retry_count <= max_retries: + # Calculate delay with exponential backoff + delay = base_delay * (2 ** (retry_count - 1)) + logger.info(f"Retrying in {delay} seconds...") + await asyncio.sleep(delay) + + # Update resume offset for next attempt + if os.path.exists(part_path): + resume_offset = os.path.getsize(part_path) + logger.info(f"Will resume from byte {resume_offset}") + + # Refresh session to get new connection + await self.close() + session = await self._ensure_fresh_session() + continue + else: + logger.error(f"Max retries exceeded for download: {e}") + return False, f"Network error after {max_retries + 1} attempts: {str(e)}" + + except Exception as e: + logger.error(f"Unexpected download error: {e}") + return False, str(e) + + return False, f"Download failed after {max_retries + 1} attempts" async def get_model_by_hash(self, model_hash: str) -> Optional[Dict]: try: diff --git a/py/services/download_manager.py b/py/services/download_manager.py index cda91efd..db3c1e14 100644 --- a/py/services/download_manager.py +++ b/py/services/download_manager.py @@ -274,9 +274,9 @@ class DownloadManager: from datetime import datetime date_obj = datetime.fromisoformat(early_access_date.replace('Z', '+00:00')) formatted_date = date_obj.strftime('%Y-%m-%d') - early_access_msg = f"This model requires early access payment (until {formatted_date}). " + early_access_msg = f"This model requires payment (until {formatted_date}). " except: - early_access_msg = "This model requires early access payment. " + early_access_msg = "This model requires payment. " early_access_msg += "Please ensure you have purchased early access and are logged in to Civitai." logger.warning(f"Early access model detected: {version_info.get('name', 'Unknown')}") @@ -321,6 +321,10 @@ class DownloadManager: download_id=download_id ) + # If early_access_msg exists and download failed, replace error message + if 'early_access_msg' in locals() and not result.get('success', False): + result['error'] = early_access_msg + return result except Exception as e: @@ -392,11 +396,13 @@ class DownloadManager: try: civitai_client = await self._get_civitai_client() save_path = metadata.file_path + part_path = save_path + '.part' metadata_path = os.path.splitext(save_path)[0] + '.metadata.json' - # Store file path in active_downloads for potential cleanup + # Store file paths in active_downloads for potential cleanup if download_id and download_id in self._active_downloads: self._active_downloads[download_id]['file_path'] = save_path + self._active_downloads[download_id]['part_path'] = part_path # Download preview image if available images = version_info.get('images', []) @@ -463,10 +469,22 @@ class DownloadManager: ) if not success: - # Clean up files on failure - for path in [save_path, metadata_path, metadata.preview_url]: + # Clean up files on failure, but preserve .part file for resume + cleanup_files = [metadata_path] + if metadata.preview_url and os.path.exists(metadata.preview_url): + cleanup_files.append(metadata.preview_url) + + for path in cleanup_files: if path and os.path.exists(path): - os.remove(path) + try: + os.remove(path) + except Exception as e: + logger.warning(f"Failed to cleanup file {path}: {e}") + + # Log but don't remove .part file to allow resume + if os.path.exists(part_path): + logger.info(f"Preserving partial download for resume: {part_path}") + return {'success': False, 'error': result} # 4. Update file information (size and modified time) @@ -502,10 +520,18 @@ class DownloadManager: except Exception as e: logger.error(f"Error in _execute_download: {e}", exc_info=True) - # Clean up partial downloads - for path in [save_path, metadata_path]: + # Clean up partial downloads except .part file + cleanup_files = [metadata_path] + if hasattr(metadata, 'preview_url') and metadata.preview_url and os.path.exists(metadata.preview_url): + cleanup_files.append(metadata.preview_url) + + for path in cleanup_files: if path and os.path.exists(path): - os.remove(path) + try: + os.remove(path) + except Exception as e: + logger.warning(f"Failed to cleanup file {path}: {e}") + return {'success': False, 'error': str(e)} async def _handle_download_progress(self, file_progress: float, progress_callback): @@ -547,35 +573,48 @@ class DownloadManager: except (asyncio.CancelledError, asyncio.TimeoutError): pass - # Clean up partial downloads + # Clean up ALL files including .part when user cancels download_info = self._active_downloads.get(download_id) - if download_info and 'file_path' in download_info: - # Delete the partial file - file_path = download_info['file_path'] - if os.path.exists(file_path): - try: - os.unlink(file_path) - logger.debug(f"Deleted partial download: {file_path}") - except Exception as e: - logger.error(f"Error deleting partial file: {e}") + if download_info: + # Delete the main file + if 'file_path' in download_info: + file_path = download_info['file_path'] + if os.path.exists(file_path): + try: + os.unlink(file_path) + logger.debug(f"Deleted cancelled download: {file_path}") + except Exception as e: + logger.error(f"Error deleting file: {e}") + + # Delete the .part file (only on user cancellation) + if 'part_path' in download_info: + part_path = download_info['part_path'] + if os.path.exists(part_path): + try: + os.unlink(part_path) + logger.debug(f"Deleted partial download: {part_path}") + except Exception as e: + logger.error(f"Error deleting part file: {e}") # Delete metadata file if exists - metadata_path = os.path.splitext(file_path)[0] + '.metadata.json' - if os.path.exists(metadata_path): - try: - os.unlink(metadata_path) - except Exception as e: - logger.error(f"Error deleting metadata file: {e}") - - # Delete preview file if exists (.webp or .mp4) - for preview_ext in ['.webp', '.mp4']: - preview_path = os.path.splitext(file_path)[0] + preview_ext - if os.path.exists(preview_path): + if 'file_path' in download_info: + file_path = download_info['file_path'] + metadata_path = os.path.splitext(file_path)[0] + '.metadata.json' + if os.path.exists(metadata_path): try: - os.unlink(preview_path) - logger.debug(f"Deleted preview file: {preview_path}") + os.unlink(metadata_path) except Exception as e: - logger.error(f"Error deleting preview file: {e}") + logger.error(f"Error deleting metadata file: {e}") + + # Delete preview file if exists (.webp or .mp4) + for preview_ext in ['.webp', '.mp4']: + preview_path = os.path.splitext(file_path)[0] + preview_ext + if os.path.exists(preview_path): + try: + os.unlink(preview_path) + logger.debug(f"Deleted preview file: {preview_path}") + except Exception as e: + logger.error(f"Error deleting preview file: {e}") return {'success': True, 'message': 'Download cancelled successfully'} except Exception as e: