mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-03-25 15:15:44 -03:00
feat(civarchive_client): remove HTML scraping implementation and bs4 dependency
Remove legacy HTML scraping implementation of get_model_by_url method and associated BeautifulSoup dependency. The functionality has been replaced by API-based implementation in get_model_version method. This simplifies the codebase and removes the optional bs4 dependency, making the client more maintainable and reliable.
This commit is contained in:
@@ -1,4 +1,3 @@
|
|||||||
import os
|
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
@@ -8,22 +7,6 @@ from .model_metadata_provider import CivArchiveModelMetadataProvider, ModelMetad
|
|||||||
from .downloader import get_downloader
|
from .downloader import get_downloader
|
||||||
from .errors import RateLimitError
|
from .errors import RateLimitError
|
||||||
|
|
||||||
try:
|
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
except ImportError as exc:
|
|
||||||
BeautifulSoup = None # type: ignore[assignment]
|
|
||||||
_BS4_IMPORT_ERROR = exc
|
|
||||||
else:
|
|
||||||
_BS4_IMPORT_ERROR = None
|
|
||||||
|
|
||||||
def _require_beautifulsoup():
|
|
||||||
if BeautifulSoup is None:
|
|
||||||
raise RuntimeError(
|
|
||||||
"BeautifulSoup (bs4) is required for CivArchive client. "
|
|
||||||
"Install it with 'pip install beautifulsoup4'."
|
|
||||||
) from _BS4_IMPORT_ERROR
|
|
||||||
return BeautifulSoup
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
class CivArchiveClient:
|
class CivArchiveClient:
|
||||||
@@ -446,109 +429,3 @@ class CivArchiveClient:
|
|||||||
if version is None:
|
if version is None:
|
||||||
return None, "Model not found"
|
return None, "Model not found"
|
||||||
return version, None
|
return version, None
|
||||||
|
|
||||||
async def get_model_by_url(self, url) -> Optional[Dict]:
|
|
||||||
"""Get specific model version by parsing CivArchive HTML page (legacy method)
|
|
||||||
|
|
||||||
This is the original HTML scraping implementation, kept for reference and new sites added not in api.
|
|
||||||
The primary get_model_version() now uses the API instead.
|
|
||||||
"""
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Construct CivArchive URL
|
|
||||||
url = f"https://civarchive.com/{url}"
|
|
||||||
downloader = await get_downloader()
|
|
||||||
session = await downloader.session
|
|
||||||
async with session.get(url) as response:
|
|
||||||
if response.status != 200:
|
|
||||||
return None
|
|
||||||
|
|
||||||
html_content = await response.text()
|
|
||||||
|
|
||||||
# Parse HTML to extract JSON data
|
|
||||||
soup_parser = _require_beautifulsoup()
|
|
||||||
soup = soup_parser(html_content, 'html.parser')
|
|
||||||
script_tag = soup.find('script', {'id': '__NEXT_DATA__', 'type': 'application/json'})
|
|
||||||
|
|
||||||
if not script_tag:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Parse JSON content
|
|
||||||
json_data = json.loads(script_tag.string)
|
|
||||||
model_data = json_data.get('props', {}).get('pageProps', {}).get('model')
|
|
||||||
|
|
||||||
if not model_data or 'version' not in model_data:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Extract version data as base
|
|
||||||
version = model_data['version'].copy()
|
|
||||||
|
|
||||||
# Restructure stats
|
|
||||||
if 'downloadCount' in version and 'ratingCount' in version and 'rating' in version:
|
|
||||||
version['stats'] = {
|
|
||||||
'downloadCount': version.pop('downloadCount'),
|
|
||||||
'ratingCount': version.pop('ratingCount'),
|
|
||||||
'rating': version.pop('rating')
|
|
||||||
}
|
|
||||||
|
|
||||||
# Rename trigger to trainedWords
|
|
||||||
if 'trigger' in version:
|
|
||||||
version['trainedWords'] = version.pop('trigger')
|
|
||||||
|
|
||||||
# Transform files data to expected format
|
|
||||||
if 'files' in version:
|
|
||||||
transformed_files = []
|
|
||||||
for file_data in version['files']:
|
|
||||||
# Find first available mirror (deletedAt is null)
|
|
||||||
available_mirror = None
|
|
||||||
for mirror in file_data.get('mirrors', []):
|
|
||||||
if mirror.get('deletedAt') is None:
|
|
||||||
available_mirror = mirror
|
|
||||||
break
|
|
||||||
|
|
||||||
# Create transformed file entry
|
|
||||||
transformed_file = {
|
|
||||||
'id': file_data.get('id'),
|
|
||||||
'sizeKB': file_data.get('sizeKB'),
|
|
||||||
'name': available_mirror.get('filename', file_data.get('name')) if available_mirror else file_data.get('name'),
|
|
||||||
'type': file_data.get('type'),
|
|
||||||
'downloadUrl': available_mirror.get('url') if available_mirror else None,
|
|
||||||
'primary': file_data.get('is_primary', False),
|
|
||||||
'mirrors': file_data.get('mirrors', [])
|
|
||||||
}
|
|
||||||
|
|
||||||
# Transform hash format
|
|
||||||
if 'sha256' in file_data:
|
|
||||||
transformed_file['hashes'] = {
|
|
||||||
'SHA256': file_data['sha256'].upper()
|
|
||||||
}
|
|
||||||
|
|
||||||
transformed_files.append(transformed_file)
|
|
||||||
|
|
||||||
version['files'] = transformed_files
|
|
||||||
|
|
||||||
# Add model information
|
|
||||||
version['model'] = {
|
|
||||||
'name': model_data.get('name'),
|
|
||||||
'type': model_data.get('type'),
|
|
||||||
'nsfw': model_data.get('is_nsfw', False),
|
|
||||||
'description': model_data.get('description'),
|
|
||||||
'tags': model_data.get('tags', [])
|
|
||||||
}
|
|
||||||
|
|
||||||
version['creator'] = {
|
|
||||||
'username': model_data.get('username'),
|
|
||||||
'image': ''
|
|
||||||
}
|
|
||||||
|
|
||||||
# Add source identifier
|
|
||||||
version['source'] = 'civarchive'
|
|
||||||
version['is_deleted'] = json_data.get('query', {}).get('is_deleted', False)
|
|
||||||
|
|
||||||
return version
|
|
||||||
|
|
||||||
except RateLimitError:
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error fetching CivArchive model version (scraping) {url}: {e}")
|
|
||||||
return None
|
|
||||||
|
|||||||
Reference in New Issue
Block a user