diff --git a/py/lora_manager.py b/py/lora_manager.py index 51655b45..76745943 100644 --- a/py/lora_manager.py +++ b/py/lora_manager.py @@ -4,6 +4,7 @@ from server import PromptServer # type: ignore from .config import config from .routes.lora_routes import LoraRoutes from .routes.api_routes import ApiRoutes +from .routes.recipe_routes import RecipeRoutes from .services.lora_scanner import LoraScanner from .services.file_monitor import LoraFileMonitor from .services.lora_cache import LoraCache @@ -63,6 +64,7 @@ class LoraManager: routes.setup_routes(app) ApiRoutes.setup_routes(app, monitor) + RecipeRoutes.setup_routes(app) # Store monitor in app for cleanup app['lora_monitor'] = monitor diff --git a/py/routes/api_routes.py b/py/routes/api_routes.py index 01666146..14c003e8 100644 --- a/py/routes/api_routes.py +++ b/py/routes/api_routes.py @@ -14,6 +14,7 @@ from ..services.websocket_manager import ws_manager from ..services.settings_manager import settings import asyncio from .update_routes import UpdateRoutes +from ..services.recipe_scanner import RecipeScanner logger = logging.getLogger(__name__) @@ -44,6 +45,7 @@ class ApiRoutes: app.router.add_post('/loras/api/save-metadata', routes.save_metadata) app.router.add_get('/api/lora-preview-url', routes.get_lora_preview_url) # Add new route app.router.add_post('/api/move_models_bulk', routes.move_models_bulk) + app.router.add_get('/api/recipes', cls.handle_get_recipes) # Add update check routes UpdateRoutes.setup_routes(app) @@ -691,3 +693,29 @@ class ApiRoutes: except Exception as e: logger.error(f"Error moving models in bulk: {e}", exc_info=True) return web.Response(text=str(e), status=500) + + @staticmethod + async def handle_get_recipes(request): + """API endpoint for getting paginated recipes""" + try: + # Get query parameters with defaults + page = int(request.query.get('page', '1')) + page_size = int(request.query.get('page_size', '20')) + sort_by = request.query.get('sort_by', 'date') + search = request.query.get('search', None) + + # Get scanner instance + scanner = RecipeScanner(LoraScanner()) + + # Get paginated data + result = await scanner.get_paginated_data( + page=page, + page_size=page_size, + sort_by=sort_by, + search=search + ) + + return web.json_response(result) + except Exception as e: + logger.error(f"Error retrieving recipes: {e}", exc_info=True) + return web.json_response({"error": str(e)}, status=500) diff --git a/py/routes/lora_routes.py b/py/routes/lora_routes.py index 64c71385..b65758c2 100644 --- a/py/routes/lora_routes.py +++ b/py/routes/lora_routes.py @@ -4,6 +4,7 @@ import jinja2 from typing import Dict, List import logging from ..services.lora_scanner import LoraScanner +from ..services.recipe_scanner import RecipeScanner from ..config import config from ..services.settings_manager import settings # Add this import @@ -15,6 +16,7 @@ class LoraRoutes: def __init__(self): self.scanner = LoraScanner() + self.recipe_scanner = RecipeScanner(self.scanner) self.template_env = jinja2.Environment( loader=jinja2.FileSystemLoader(config.templates_path), autoescape=True @@ -87,6 +89,46 @@ class LoraRoutes: status=500 ) + async def handle_recipes_page(self, request: web.Request) -> web.Response: + """Handle GET /loras/recipes request""" + try: + # Check cache initialization status + is_initializing = ( + self.recipe_scanner._cache is None and + (self.recipe_scanner._initialization_task is not None and + not self.recipe_scanner._initialization_task.done()) + ) + + if is_initializing: + # If initializing, return a loading page + template = self.template_env.get_template('recipes.html') + rendered = template.render( + is_initializing=True, + settings=settings + ) + else: + # Normal flow + cache = await self.recipe_scanner.get_cached_data() + template = self.template_env.get_template('recipes.html') + rendered = template.render( + recipes=cache.sorted_by_date[:20], # Show first 20 recipes by date + is_initializing=False, + settings=settings + ) + + return web.Response( + text=rendered, + content_type='text/html' + ) + + except Exception as e: + logger.error(f"Error handling recipes request: {e}", exc_info=True) + return web.Response( + text="Error loading recipes page", + status=500 + ) + def setup_routes(self, app: web.Application): """Register routes with the application""" app.router.add_get('/loras', self.handle_loras_page) + app.router.add_get('/loras/recipes', self.handle_recipes_page) diff --git a/py/routes/recipe_routes.py b/py/routes/recipe_routes.py new file mode 100644 index 00000000..8e1ac310 --- /dev/null +++ b/py/routes/recipe_routes.py @@ -0,0 +1,134 @@ +import os +import logging +import sys +from aiohttp import web +from typing import Dict + +from ..services.recipe_scanner import RecipeScanner +from ..services.lora_scanner import LoraScanner +from ..config import config + +logger = logging.getLogger(__name__) +print("Recipe Routes module loaded", file=sys.stderr) + +class RecipeRoutes: + """API route handlers for Recipe management""" + + def __init__(self): + print("Initializing RecipeRoutes", file=sys.stderr) + self.recipe_scanner = RecipeScanner(LoraScanner()) + + # Pre-warm the cache + self._init_cache_task = None + + @classmethod + def setup_routes(cls, app: web.Application): + """Register API routes""" + print("Setting up recipe routes", file=sys.stderr) + routes = cls() + app.router.add_get('/api/recipes', routes.get_recipes) + app.router.add_get('/api/recipe/{recipe_id}', routes.get_recipe_detail) + + # Start cache initialization + app.on_startup.append(routes._init_cache) + + print("Recipe routes setup complete", file=sys.stderr) + + async def _init_cache(self, app): + """Initialize cache on startup""" + print("Pre-warming recipe cache...", file=sys.stderr) + try: + # Diagnose lora scanner first + await self.recipe_scanner._lora_scanner.diagnose_hash_index() + + # Force a cache refresh + await self.recipe_scanner.get_cached_data(force_refresh=True) + print("Recipe cache pre-warming complete", file=sys.stderr) + except Exception as e: + print(f"Error pre-warming recipe cache: {e}", file=sys.stderr) + + async def get_recipes(self, request: web.Request) -> web.Response: + """API endpoint for getting paginated recipes""" + try: + print("API: GET /api/recipes", file=sys.stderr) + # Get query parameters with defaults + page = int(request.query.get('page', '1')) + page_size = int(request.query.get('page_size', '20')) + sort_by = request.query.get('sort_by', 'date') + search = request.query.get('search', None) + + # Get paginated data + result = await self.recipe_scanner.get_paginated_data( + page=page, + page_size=page_size, + sort_by=sort_by, + search=search + ) + + # Format the response data with static URLs for file paths + for item in result['items']: + item['preview_url'] = item['file_path'] + # Convert file path to URL + item['file_url'] = self._format_recipe_file_url(item['file_path']) + + return web.json_response(result) + except Exception as e: + logger.error(f"Error retrieving recipes: {e}", exc_info=True) + print(f"API Error: {e}", file=sys.stderr) + return web.json_response({"error": str(e)}, status=500) + + async def get_recipe_detail(self, request: web.Request) -> web.Response: + """Get detailed information about a specific recipe""" + try: + recipe_id = request.match_info['recipe_id'] + + # Get all recipes from cache + cache = await self.recipe_scanner.get_cached_data() + + # Find the specific recipe + recipe = next((r for r in cache.raw_data if str(r.get('id', '')) == recipe_id), None) + + if not recipe: + return web.json_response({"error": "Recipe not found"}, status=404) + + # Format recipe data + formatted_recipe = self._format_recipe_data(recipe) + + return web.json_response(formatted_recipe) + except Exception as e: + logger.error(f"Error retrieving recipe details: {e}", exc_info=True) + return web.json_response({"error": str(e)}, status=500) + + def _format_recipe_file_url(self, file_path: str) -> str: + """Format file path for recipe image as a URL""" + # This is a simplified example - in real implementation, + # you would map this to a static route that can serve the file + + # For recipes folder in the first lora root + for idx, root in enumerate(config.loras_roots, start=1): + recipes_dir = os.path.join(root, "recipes") + if file_path.startswith(recipes_dir): + relative_path = os.path.relpath(file_path, root) + return f"/loras_static/root{idx}/{relative_path}" + + return file_path # Return original path if no mapping found + + def _format_recipe_data(self, recipe: Dict) -> Dict: + """Format recipe data for API response""" + formatted = {**recipe} # Copy all fields + + # Format file paths to URLs + if 'file_path' in formatted: + formatted['file_url'] = self._format_recipe_file_url(formatted['file_path']) + + # Format dates for display + for date_field in ['created_date', 'modified']: + if date_field in formatted: + formatted[f"{date_field}_formatted"] = self._format_timestamp(formatted[date_field]) + + return formatted + + def _format_timestamp(self, timestamp: float) -> str: + """Format timestamp for display""" + from datetime import datetime + return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') \ No newline at end of file diff --git a/py/services/civitai_client.py b/py/services/civitai_client.py index ae08e56c..286fcd78 100644 --- a/py/services/civitai_client.py +++ b/py/services/civitai_client.py @@ -167,4 +167,30 @@ class CivitaiClient: """Close the session if it exists""" if self._session is not None: await self._session.close() - self._session = None \ No newline at end of file + self._session = None + + async def _get_hash_from_civitai(self, model_version_id: str) -> Optional[str]: + """Get hash from Civitai API""" + try: + if not self._session: + return None + + logger.info(f"Fetching model version info from Civitai for ID: {model_version_id}") + version_info = await self._session.get(f"{self.base_url}/model-versions/{model_version_id}") + + if not version_info or not version_info.json().get('files'): + logger.warning(f"No files found in version info for ID: {model_version_id}") + return None + + # Get hash from the first file + for file_info in version_info.json().get('files', []): + if file_info.get('hashes', {}).get('SHA256'): + # Convert hash to lowercase to standardize + hash_value = file_info['hashes']['SHA256'].lower() + return hash_value + + logger.warning(f"No SHA256 hash found in version info for ID: {model_version_id}") + return None + except Exception as e: + logger.error(f"Error getting hash from Civitai: {e}") + return None \ No newline at end of file diff --git a/py/services/lora_hash_index.py b/py/services/lora_hash_index.py index f5b6c4e7..8285d293 100644 --- a/py/services/lora_hash_index.py +++ b/py/services/lora_hash_index.py @@ -15,11 +15,13 @@ class LoraHashIndex: """Add or update a hash -> path mapping""" if not sha256 or not file_path: return - self._hash_to_path[sha256] = file_path + # Always store lowercase hashes for consistency + self._hash_to_path[sha256.lower()] = file_path def remove_entry(self, sha256: str) -> None: """Remove a hash entry""" - self._hash_to_path.pop(sha256, None) + if sha256: + self._hash_to_path.pop(sha256.lower(), None) def remove_by_path(self, file_path: str) -> None: """Remove entry by file path""" @@ -30,7 +32,9 @@ class LoraHashIndex: def get_path(self, sha256: str) -> Optional[str]: """Get file path for a given hash""" - return self._hash_to_path.get(sha256) + if not sha256: + return None + return self._hash_to_path.get(sha256.lower()) def get_hash(self, file_path: str) -> Optional[str]: """Get hash for a given file path""" @@ -41,7 +45,9 @@ class LoraHashIndex: def has_hash(self, sha256: str) -> bool: """Check if hash exists in index""" - return sha256 in self._hash_to_path + if not sha256: + return False + return sha256.lower() in self._hash_to_path def clear(self) -> None: """Clear all entries""" diff --git a/py/services/lora_scanner.py b/py/services/lora_scanner.py index 857a4369..88d53049 100644 --- a/py/services/lora_scanner.py +++ b/py/services/lora_scanner.py @@ -11,6 +11,7 @@ from ..utils.file_utils import load_metadata, get_file_info from .lora_cache import LoraCache from difflib import SequenceMatcher from .lora_hash_index import LoraHashIndex +import sys logger = logging.getLogger(__name__) @@ -505,3 +506,38 @@ class LoraScanner: """Get hash for a LoRA by its file path""" return self._hash_index.get_hash(file_path) + async def diagnose_hash_index(self): + """Diagnostic method to verify hash index functionality""" + print("\n\n*** DIAGNOSING LORA HASH INDEX ***\n\n", file=sys.stderr) + + # First check if the hash index has any entries + if hasattr(self, '_hash_index'): + index_entries = len(self._hash_index._hash_to_path) + print(f"Hash index has {index_entries} entries", file=sys.stderr) + + # Print a few example entries if available + if index_entries > 0: + print("\nSample hash index entries:", file=sys.stderr) + count = 0 + for hash_val, path in self._hash_index._hash_to_path.items(): + if count < 5: # Just show the first 5 + print(f"Hash: {hash_val[:8]}... -> Path: {path}", file=sys.stderr) + count += 1 + else: + break + else: + print("Hash index not initialized", file=sys.stderr) + + # Try looking up by a known hash for testing + if not hasattr(self, '_hash_index') or not self._hash_index._hash_to_path: + print("No hash entries to test lookup with", file=sys.stderr) + return + + test_hash = next(iter(self._hash_index._hash_to_path.keys())) + test_path = self._hash_index.get_path(test_hash) + print(f"\nTest lookup by hash: {test_hash[:8]}... -> {test_path}", file=sys.stderr) + + # Also test reverse lookup + test_hash_result = self._hash_index.get_hash(test_path) + print(f"Test reverse lookup: {test_path} -> {test_hash_result[:8]}...\n\n", file=sys.stderr) + diff --git a/py/services/recipe_cache.py b/py/services/recipe_cache.py new file mode 100644 index 00000000..c10f4131 --- /dev/null +++ b/py/services/recipe_cache.py @@ -0,0 +1,51 @@ +import asyncio +from typing import List, Dict +from dataclasses import dataclass +from operator import itemgetter + +@dataclass +class RecipeCache: + """Cache structure for Recipe data""" + raw_data: List[Dict] + sorted_by_name: List[Dict] + sorted_by_date: List[Dict] + + def __post_init__(self): + self._lock = asyncio.Lock() + + async def resort(self, name_only: bool = False): + """Resort all cached data views""" + async with self._lock: + self.sorted_by_name = sorted( + self.raw_data, + key=lambda x: x.get('title', '').lower() # Case-insensitive sort + ) + if not name_only: + self.sorted_by_date = sorted( + self.raw_data, + key=itemgetter('created_date', 'file_path'), + reverse=True + ) + + async def update_recipe_metadata(self, file_path: str, metadata: Dict) -> bool: + """Update metadata for a specific recipe in all cached data + + Args: + file_path: The file path of the recipe to update + metadata: The new metadata + + Returns: + bool: True if the update was successful, False if the recipe wasn't found + """ + async with self._lock: + # Update in raw_data + for item in self.raw_data: + if item['file_path'] == file_path: + item.update(metadata) + break + else: + return False # Recipe not found + + # Resort to reflect changes + await self.resort() + return True \ No newline at end of file diff --git a/py/services/recipe_scanner.py b/py/services/recipe_scanner.py new file mode 100644 index 00000000..5e244ab8 --- /dev/null +++ b/py/services/recipe_scanner.py @@ -0,0 +1,610 @@ +import os +import logging +import asyncio +import json +import re +from typing import List, Dict, Optional, Any +from datetime import datetime +from ..utils.exif_utils import ExifUtils +from ..config import config +from .recipe_cache import RecipeCache +from .lora_scanner import LoraScanner +from .civitai_client import CivitaiClient +import sys + +print("Recipe Scanner module loaded", file=sys.stderr) + +def setup_logger(): + """Configure logger for recipe scanner""" + # First, print directly to stderr + print("Setting up recipe scanner logger", file=sys.stderr) + + # Create a stderr handler + handler = logging.StreamHandler(sys.stderr) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + + # Configure recipe logger + recipe_logger = logging.getLogger(__name__) + recipe_logger.setLevel(logging.INFO) + + # Remove existing handlers if any + for h in recipe_logger.handlers: + recipe_logger.removeHandler(h) + + recipe_logger.addHandler(handler) + recipe_logger.propagate = False + + # Also ensure the root logger has a handler + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + + # Check if the root logger already has handlers + if not root_logger.handlers: + root_logger.addHandler(handler) + + print(f"Logger setup complete: {__name__}", file=sys.stderr) + return recipe_logger + +# Use our configured logger +logger = setup_logger() + +class RecipeScanner: + """Service for scanning and managing recipe images""" + + _instance = None + _lock = asyncio.Lock() + + def __new__(cls, lora_scanner: Optional[LoraScanner] = None): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._lora_scanner = lora_scanner + cls._instance._civitai_client = CivitaiClient() + return cls._instance + + def __init__(self, lora_scanner: Optional[LoraScanner] = None): + # Ensure initialization only happens once + if not hasattr(self, '_initialized'): + self._cache: Optional[RecipeCache] = None + self._initialization_lock = asyncio.Lock() + self._initialization_task: Optional[asyncio.Task] = None + if lora_scanner: + self._lora_scanner = lora_scanner + self._initialized = True + + @property + def recipes_dir(self) -> str: + """Get path to recipes directory""" + if not config.loras_roots: + return "" + + # Sort the lora roots case-insensitively + sorted_roots = sorted(config.loras_roots, key=lambda x: x.lower()) + + # Use the first sorted lora root as base + recipes_dir = os.path.join(sorted_roots[0], "recipes") + os.makedirs(recipes_dir, exist_ok=True) + logger.info(f"Using recipes directory: {recipes_dir}") + + return recipes_dir + + async def get_cached_data(self, force_refresh: bool = False) -> RecipeCache: + """Get cached recipe data, refresh if needed""" + async with self._initialization_lock: + + # If cache is unitialized but needs to respond to request, return empty cache + if self._cache is None and not force_refresh: + return RecipeCache( + raw_data=[], + sorted_by_name=[], + sorted_by_date=[] + ) + + # If initializing, wait for completion + if self._initialization_task and not self._initialization_task.done(): + try: + await self._initialization_task + except Exception as e: + logger.error(f"Recipe cache initialization failed: {e}") + self._initialization_task = None + + if (self._cache is None or force_refresh): + + # Create new initialization task + if not self._initialization_task or self._initialization_task.done(): + # First ensure the lora scanner is initialized + if self._lora_scanner: + await self._lora_scanner.get_cached_data() + + self._initialization_task = asyncio.create_task(self._initialize_cache()) + + try: + await self._initialization_task + except Exception as e: + logger.error(f"Recipe cache initialization failed: {e}") + # If cache already exists, continue using old cache + if self._cache is None: + raise # If no cache, raise exception + + logger.info(f"Recipe cache initialized with {len(self._cache.raw_data)} recipes") + logger.info(f"Recipe cache: {json.dumps(self._cache, indent=2)}") + + return self._cache + + async def _initialize_cache(self) -> None: + """Initialize or refresh the cache""" + try: + # Ensure lora scanner is fully initialized first + if self._lora_scanner: + logger.info("Recipe Manager: Waiting for lora scanner initialization to complete") + + # Force a fresh initialization of the lora scanner to ensure it's complete + lora_cache = await self._lora_scanner.get_cached_data(force_refresh=True) + + # Add a delay to ensure any background tasks complete + await asyncio.sleep(2) + + # Get the cache again to ensure we have the latest data + lora_cache = await self._lora_scanner.get_cached_data() + logger.info(f"Recipe Manager: Lora scanner initialized with {len(lora_cache.raw_data)} loras") + + # Verify hash index is built + if hasattr(self._lora_scanner, '_hash_index'): + hash_index_size = len(self._lora_scanner._hash_index._hash_to_path) if hasattr(self._lora_scanner._hash_index, '_hash_to_path') else 0 + logger.info(f"Recipe Manager: Lora hash index contains {hash_index_size} entries") + + # If hash index is empty but we have loras, consider this an error condition + if hash_index_size == 0 and len(lora_cache.raw_data) > 0: + logger.error("Recipe Manager: Lora hash index is empty despite having loras in cache") + await self._lora_scanner.diagnose_hash_index() + + # Wait another moment for hash index to potentially initialize + await asyncio.sleep(1) + + # Try to check again + hash_index_size = len(self._lora_scanner._hash_index._hash_to_path) if hasattr(self._lora_scanner._hash_index, '_hash_to_path') else 0 + logger.info(f"Recipe Manager: Lora hash index now contains {hash_index_size} entries") + else: + logger.warning("Recipe Manager: No lora hash index available") + else: + logger.warning("Recipe Manager: No lora scanner available") + + # Scan for recipe data + raw_data = await self.scan_all_recipes() + + # Update cache + self._cache = RecipeCache( + raw_data=raw_data, + sorted_by_name=[], + sorted_by_date=[] + ) + + # Resort cache + await self._cache.resort() + + self._initialization_task = None + logger.info("Recipe Manager: Cache initialization completed") + except Exception as e: + logger.error(f"Recipe Manager: Error initializing cache: {e}", exc_info=True) + self._cache = RecipeCache( + raw_data=[], + sorted_by_name=[], + sorted_by_date=[] + ) + + async def scan_all_recipes(self) -> List[Dict]: + """Scan all recipe images and return metadata""" + recipes = [] + recipes_dir = self.recipes_dir + + if not recipes_dir or not os.path.exists(recipes_dir): + logger.warning(f"Recipes directory not found: {recipes_dir}") + return recipes + + # Get all jpg/jpeg files in the recipes directory + image_files = [] + logger.info(f"Scanning for recipe images in {recipes_dir}") + for root, _, files in os.walk(recipes_dir): + image_count = sum(1 for f in files if f.lower().endswith(('.jpg', '.jpeg'))) + if image_count > 0: + logger.info(f"Found {image_count} potential recipe images in {root}") + for file in files: + if file.lower().endswith(('.jpg', '.jpeg')): + image_files.append(os.path.join(root, file)) + + # Process each image + for image_path in image_files: + recipe_data = await self._process_recipe_image(image_path) + if recipe_data: + recipes.append(recipe_data) + logger.info(f"Processed recipe: {recipe_data.get('title')}") + + logger.info(f"Successfully processed {len(recipes)} recipes") + + return recipes + + async def _process_recipe_image(self, image_path: str) -> Optional[Dict]: + """Process a single recipe image and return metadata""" + try: + print(f"Processing recipe image: {image_path}", file=sys.stderr) + logger.info(f"Processing recipe image: {image_path}") + + # Extract EXIF UserComment + user_comment = ExifUtils.extract_user_comment(image_path) + if not user_comment: + print(f"No EXIF UserComment found in {image_path}", file=sys.stderr) + logger.warning(f"No EXIF UserComment found in {image_path}") + return None + else: + print(f"Found UserComment: {user_comment[:50]}...", file=sys.stderr) + + # Parse metadata from UserComment + recipe_data = ExifUtils.parse_recipe_metadata(user_comment) + if not recipe_data: + print(f"Failed to parse recipe metadata from {image_path}", file=sys.stderr) + logger.warning(f"Failed to parse recipe metadata from {image_path}") + return None + + # Get file info + stat = os.stat(image_path) + file_name = os.path.basename(image_path) + title = os.path.splitext(file_name)[0] + + # Add common metadata + recipe_data.update({ + 'id': file_name, + 'file_path': image_path, + 'title': title, + 'modified': stat.st_mtime, + 'created_date': stat.st_ctime, + 'file_size': stat.st_size + }) + + # Update recipe metadata with missing information + metadata_updated = await self._update_recipe_metadata(recipe_data, user_comment) + recipe_data['_metadata_updated'] = metadata_updated + + # If metadata was updated, save back to image + if metadata_updated: + print(f"Updating metadata for {image_path}", file=sys.stderr) + logger.info(f"Updating metadata for {image_path}") + self._save_updated_metadata(image_path, user_comment, recipe_data) + + return recipe_data + except Exception as e: + print(f"Error processing recipe image {image_path}: {e}", file=sys.stderr) + logger.error(f"Error processing recipe image {image_path}: {e}") + import traceback + traceback.print_exc(file=sys.stderr) + return None + + def _create_basic_recipe_data(self, image_path: str) -> Dict: + """Create basic recipe data from file information""" + file_name = os.path.basename(image_path) + title = os.path.splitext(file_name)[0] + + return { + 'file_path': image_path.replace(os.sep, '/'), + 'title': title, + 'file_name': file_name, + 'modified': os.path.getmtime(image_path), + 'created_date': os.path.getctime(image_path), + 'loras': [] + } + + def _extract_created_date(self, user_comment: str) -> Optional[float]: + """Extract creation date from UserComment if present""" + try: + # Look for Created Date pattern + created_date_match = re.search(r'Created Date: ([^,}]+)', user_comment) + if created_date_match: + date_str = created_date_match.group(1).strip() + # Parse ISO format date + dt = datetime.fromisoformat(date_str.replace('Z', '+00:00')) + return dt.timestamp() + except Exception as e: + logger.error(f"Error extracting creation date: {e}") + + return None + + async def _update_lora_information(self, recipe_data: Dict) -> bool: + """Update LoRA information with hash and file_name + + Returns: + bool: True if metadata was updated + """ + if not recipe_data.get('loras'): + return False + + metadata_updated = False + + for lora in recipe_data['loras']: + logger.info(f"Processing LoRA: {lora.get('modelName', 'Unknown')}, ID: {lora.get('modelVersionId', 'No ID')}") + + # Skip if already has complete information + if 'hash' in lora and 'file_name' in lora and lora['file_name']: + logger.info(f"LoRA already has complete information") + continue + + # If has modelVersionId but no hash, look in lora cache first, then fetch from Civitai + if 'modelVersionId' in lora and not lora.get('hash'): + model_version_id = lora['modelVersionId'] + logger.info(f"Looking up hash for modelVersionId: {model_version_id}") + + # Try to find in lora cache first + hash_from_cache = await self._find_hash_in_lora_cache(model_version_id) + if hash_from_cache: + logger.info(f"Found hash in lora cache: {hash_from_cache}") + lora['hash'] = hash_from_cache + metadata_updated = True + else: + # If not in cache, fetch from Civitai + logger.info(f"Fetching hash from Civitai for {model_version_id}") + hash_from_civitai = await self._get_hash_from_civitai(model_version_id) + if hash_from_civitai: + logger.info(f"Got hash from Civitai: {hash_from_civitai}") + lora['hash'] = hash_from_civitai + metadata_updated = True + else: + logger.warning(f"Could not get hash for modelVersionId {model_version_id}") + + # If has hash but no file_name, look up in lora library + if 'hash' in lora and (not lora.get('file_name') or not lora['file_name']): + hash_value = lora['hash'] + logger.info(f"Looking up file_name for hash: {hash_value}") + + if self._lora_scanner.has_lora_hash(hash_value): + lora_path = self._lora_scanner.get_lora_path_by_hash(hash_value) + if lora_path: + file_name = os.path.splitext(os.path.basename(lora_path))[0] + logger.info(f"Found lora in library: {file_name}") + lora['file_name'] = file_name + metadata_updated = True + else: + # Lora not in library + logger.info(f"LoRA with hash {hash_value} not found in library") + lora['file_name'] = '' + metadata_updated = True + + return metadata_updated + + async def _find_hash_in_lora_cache(self, model_version_id: str) -> Optional[str]: + """Find hash in lora cache based on modelVersionId""" + try: + # Get all loras from cache + if not self._lora_scanner: + return None + + cache = await self._lora_scanner.get_cached_data() + if not cache or not cache.raw_data: + return None + + # Find lora with matching civitai.id + for lora in cache.raw_data: + civitai_data = lora.get('civitai', {}) + if civitai_data and str(civitai_data.get('id', '')) == str(model_version_id): + return lora.get('sha256') + + return None + except Exception as e: + logger.error(f"Error finding hash in lora cache: {e}") + return None + + async def _get_hash_from_civitai(self, model_version_id: str) -> Optional[str]: + """Get hash from Civitai API""" + try: + if not self._civitai_client: + return None + + logger.info(f"Fetching model version info from Civitai for ID: {model_version_id}") + version_info = await self._civitai_client.get_model_version_info(model_version_id) + + if not version_info or not version_info.get('files'): + logger.warning(f"No files found in version info for ID: {model_version_id}") + return None + + # Get hash from the first file + for file_info in version_info.get('files', []): + if file_info.get('hashes', {}).get('SHA256'): + return file_info['hashes']['SHA256'] + + logger.warning(f"No SHA256 hash found in version info for ID: {model_version_id}") + return None + except Exception as e: + logger.error(f"Error getting hash from Civitai: {e}") + return None + + def _save_updated_metadata(self, image_path: str, original_comment: str, recipe_data: Dict) -> None: + """Save updated metadata back to image file""" + try: + # Update the resources section with the updated lora data + resources_match = re.search(r'(Civitai resources: )(\[.*?\])(?:,|\})', original_comment) + if not resources_match: + logger.warning(f"Could not find Civitai resources section in {image_path}") + return + + resources_prefix = resources_match.group(1) + + # Generate updated resources array + resources = [] + + # Add checkpoint if exists + if recipe_data.get('checkpoint'): + resources.append(recipe_data['checkpoint']) + + # Add all loras + resources.extend(recipe_data.get('loras', [])) + + # Generate new resources JSON + updated_resources = json.dumps(resources) + + # Replace resources in original comment + updated_comment = original_comment.replace( + resources_match.group(0), + f"{resources_prefix}{updated_resources}," + ) + + # Update metadata section if it exists + metadata_match = re.search(r'(Civitai metadata: )(\{.*?\})', original_comment) + if metadata_match: + metadata_prefix = metadata_match.group(1) + + # Create metadata object with base_model + metadata = {} + if recipe_data.get('base_model'): + metadata['base_model'] = recipe_data['base_model'] + + # Generate new metadata JSON + updated_metadata = json.dumps(metadata) + + # Replace metadata in original comment + updated_comment = updated_comment.replace( + metadata_match.group(0), + f"{metadata_prefix}{updated_metadata}" + ) + + # Save back to image + logger.info(f"Saving updated metadata to {image_path}") + ExifUtils.update_user_comment(image_path, updated_comment) + + except Exception as e: + logger.error(f"Error saving updated metadata: {e}", exc_info=True) + + async def get_paginated_data(self, page: int, page_size: int, sort_by: str = 'date', search: str = None): + """Get paginated and filtered recipe data + + Args: + page: Current page number (1-based) + page_size: Number of items per page + sort_by: Sort method ('name' or 'date') + search: Search term + """ + cache = await self.get_cached_data() + + # Get base dataset + filtered_data = cache.sorted_by_date if sort_by == 'date' else cache.sorted_by_name + + # Apply search filter + if search: + filtered_data = [ + item for item in filtered_data + if search.lower() in str(item.get('title', '')).lower() or + search.lower() in str(item.get('prompt', '')).lower() + ] + + # Calculate pagination + total_items = len(filtered_data) + start_idx = (page - 1) * page_size + end_idx = min(start_idx + page_size, total_items) + + result = { + 'items': filtered_data[start_idx:end_idx], + 'total': total_items, + 'page': page, + 'page_size': page_size, + 'total_pages': (total_items + page_size - 1) // page_size + } + + return result + + async def _update_recipe_metadata(self, recipe_data: Dict, original_comment: str) -> bool: + """Update recipe metadata with missing information + + Returns: + bool: True if metadata was updated + """ + metadata_updated = False + + # Update lora information + for lora in recipe_data.get('loras', []): + # First check if modelVersionId exists and hash doesn't + if 'modelVersionId' in lora and not lora.get('hash'): + model_version_id = str(lora['modelVersionId']) + # Try to find hash in lora cache first + hash_from_cache = await self._find_hash_in_lora_cache(model_version_id) + if hash_from_cache: + logger.info(f"Found hash in cache for modelVersionId {model_version_id}") + lora['hash'] = hash_from_cache.lower() # Standardize to lowercase + metadata_updated = True + else: + # If not in cache, fetch from Civitai + logger.info(f"Fetching hash from Civitai for {model_version_id}") + hash_from_civitai = await self._get_hash_from_civitai(model_version_id) + if hash_from_civitai: + logger.info(f"Got hash from Civitai") + lora['hash'] = hash_from_civitai.lower() # Standardize to lowercase + metadata_updated = True + else: + logger.warning(f"Could not get hash for modelVersionId {model_version_id}") + + # If has hash, check if it's in library + if 'hash' in lora: + hash_value = lora['hash'].lower() # Ensure lowercase when comparing + in_library = self._lora_scanner.has_lora_hash(hash_value) + lora['inLibrary'] = in_library + + # If hash is in library but no file_name, look up and set file_name + if in_library and (not lora.get('file_name') or not lora['file_name']): + lora_path = self._lora_scanner.get_lora_path_by_hash(hash_value) + if lora_path: + file_name = os.path.splitext(os.path.basename(lora_path))[0] + logger.info(f"Found lora in library: {file_name}") + lora['file_name'] = file_name + metadata_updated = True + + # Also get base_model from lora cache if possible + base_model = await self._get_base_model_for_lora(lora_path) + if base_model: + lora['base_model'] = base_model + elif not in_library: + # Lora not in library + logger.info(f"LoRA with hash {hash_value[:8]}... not found in library") + lora['file_name'] = '' + metadata_updated = True + + # Determine the base_model for the recipe based on loras + if recipe_data.get('loras'): + base_model = await self._determine_base_model(recipe_data.get('loras', [])) + if base_model and (not recipe_data.get('base_model') or recipe_data['base_model'] != base_model): + recipe_data['base_model'] = base_model + metadata_updated = True + + return metadata_updated + + async def _determine_base_model(self, loras: List[Dict]) -> Optional[str]: + """Determine the most common base model among LoRAs""" + base_models = {} + + # Count occurrences of each base model + for lora in loras: + if 'hash' in lora: + lora_path = self._lora_scanner.get_lora_path_by_hash(lora['hash']) + if lora_path: + base_model = await self._get_base_model_for_lora(lora_path) + if base_model: + base_models[base_model] = base_models.get(base_model, 0) + 1 + + # Return the most common base model + if base_models: + return max(base_models.items(), key=lambda x: x[1])[0] + return None + + async def _get_base_model_for_lora(self, lora_path: str) -> Optional[str]: + """Get base model for a LoRA from cache""" + try: + if not self._lora_scanner: + return None + + cache = await self._lora_scanner.get_cached_data() + if not cache or not cache.raw_data: + return None + + # Find matching lora in cache + for lora in cache.raw_data: + if lora.get('file_path') == lora_path: + return lora.get('base_model') + + return None + except Exception as e: + logger.error(f"Error getting base model for lora: {e}") + return None \ No newline at end of file diff --git a/py/utils/exif_utils.py b/py/utils/exif_utils.py new file mode 100644 index 00000000..f9b78cfa --- /dev/null +++ b/py/utils/exif_utils.py @@ -0,0 +1,110 @@ +import piexif +import json +import logging +from typing import Dict, Optional, Any +from io import BytesIO +from PIL import Image +import re + +logger = logging.getLogger(__name__) + +class ExifUtils: + """Utility functions for working with EXIF data in images""" + + @staticmethod + def extract_user_comment(image_path: str) -> Optional[str]: + """Extract UserComment field from image EXIF data""" + try: + exif_dict = piexif.load(image_path) + + if piexif.ExifIFD.UserComment in exif_dict.get('Exif', {}): + user_comment = exif_dict['Exif'][piexif.ExifIFD.UserComment] + if isinstance(user_comment, bytes): + if user_comment.startswith(b'UNICODE\0'): + user_comment = user_comment[8:].decode('utf-16be') + else: + user_comment = user_comment.decode('utf-8', errors='ignore') + return user_comment + return None + except Exception as e: + logger.error(f"Error extracting EXIF data from {image_path}: {e}") + return None + + @staticmethod + def update_user_comment(image_path: str, user_comment: str) -> bool: + """Update UserComment field in image EXIF data""" + try: + # Load the image and its EXIF data + with Image.open(image_path) as img: + exif_dict = piexif.load(img.info.get('exif', b'')) + + # If no Exif dictionary exists, create one + if 'Exif' not in exif_dict: + exif_dict['Exif'] = {} + + # Update the UserComment field + if isinstance(user_comment, str): + user_comment_bytes = user_comment.encode('utf-8') + else: + user_comment_bytes = user_comment + + exif_dict['Exif'][piexif.ExifIFD.UserComment] = user_comment_bytes + + # Convert EXIF dict back to bytes + exif_bytes = piexif.dump(exif_dict) + + # Save the image with updated EXIF data + img.save(image_path, exif=exif_bytes) + + return True + except Exception as e: + logger.error(f"Error updating EXIF data in {image_path}: {e}") + return False + + @staticmethod + def parse_recipe_metadata(user_comment: str) -> Dict[str, Any]: + """Parse recipe metadata from UserComment""" + try: + # Split by 'Negative prompt:' to get the prompt + parts = user_comment.split('Negative prompt:', 1) + prompt = parts[0].strip() + + # Initialize metadata with prompt + metadata = {"prompt": prompt, "loras": [], "checkpoint": None} + + # Extract additional fields if available + if len(parts) > 1: + negative_and_params = parts[1] + + # Extract negative prompt + if "Steps:" in negative_and_params: + neg_prompt = negative_and_params.split("Steps:", 1)[0].strip() + metadata["negative_prompt"] = neg_prompt + + # Extract key-value parameters (Steps, Sampler, CFG scale, etc.) + param_pattern = r'([A-Za-z ]+): ([^,]+)' + params = re.findall(param_pattern, negative_and_params) + for key, value in params: + clean_key = key.strip().lower().replace(' ', '_') + metadata[clean_key] = value.strip() + + # Extract Civitai resources + if 'Civitai resources:' in user_comment: + resources_part = user_comment.split('Civitai resources:', 1)[1] + if '],' in resources_part: + resources_json = resources_part.split('],', 1)[0] + ']' + try: + resources = json.loads(resources_json) + # Filter loras and checkpoints + for resource in resources: + if resource.get('type') == 'lora': + metadata['loras'].append(resource) + elif resource.get('type') == 'checkpoint': + metadata['checkpoint'] = resource + except json.JSONDecodeError: + pass + + return metadata + except Exception as e: + logger.error(f"Error parsing recipe metadata: {e}") + return {"prompt": user_comment, "loras": [], "checkpoint": None} \ No newline at end of file diff --git a/static/js/recipes.js b/static/js/recipes.js new file mode 100644 index 00000000..55bcd37b --- /dev/null +++ b/static/js/recipes.js @@ -0,0 +1,166 @@ +// Recipe manager module +import { showToast } from './utils/uiHelpers.js'; +import { state } from './state/index.js'; + +class RecipeManager { + constructor() { + this.currentPage = 1; + this.pageSize = 20; + this.sortBy = 'date'; + this.filterParams = {}; + + this.init(); + } + + init() { + // Initialize event listeners + this.initEventListeners(); + + // Load initial set of recipes + this.loadRecipes(); + } + + initEventListeners() { + // Sort select + const sortSelect = document.getElementById('sortSelect'); + if (sortSelect) { + sortSelect.addEventListener('change', () => { + this.sortBy = sortSelect.value; + this.loadRecipes(); + }); + } + + // Search input + const searchInput = document.getElementById('searchInput'); + if (searchInput) { + let debounceTimeout; + searchInput.addEventListener('input', () => { + clearTimeout(debounceTimeout); + debounceTimeout = setTimeout(() => { + this.filterParams.search = searchInput.value; + this.loadRecipes(); + }, 300); + }); + } + } + + async loadRecipes() { + try { + // Show loading indicator + document.body.classList.add('loading'); + + // Build query parameters + const params = new URLSearchParams({ + page: this.currentPage, + page_size: this.pageSize, + sort_by: this.sortBy + }); + + // Add search filter if present + if (this.filterParams.search) { + params.append('search', this.filterParams.search); + } + + // Add other filters + if (this.filterParams.baseModels && this.filterParams.baseModels.length) { + params.append('base_models', this.filterParams.baseModels.join(',')); + } + + // Fetch recipes + const response = await fetch(`/api/recipes?${params.toString()}`); + + if (!response.ok) { + throw new Error(`Failed to load recipes: ${response.statusText}`); + } + + const data = await response.json(); + + // Update recipes grid + this.updateRecipesGrid(data); + + } catch (error) { + console.error('Error loading recipes:', error); + showToast('Failed to load recipes', 'error'); + } finally { + // Hide loading indicator + document.body.classList.remove('loading'); + } + } + + updateRecipesGrid(data) { + const grid = document.getElementById('recipeGrid'); + if (!grid) return; + + // Check if data exists and has items + if (!data.items || data.items.length === 0) { + grid.innerHTML = ` +
+ `; + return; + } + + // Clear grid + grid.innerHTML = ''; + + // Create recipe cards + data.items.forEach(recipe => { + const card = this.createRecipeCard(recipe); + grid.appendChild(card); + }); + } + + createRecipeCard(recipe) { + const card = document.createElement('div'); + card.className = 'recipe-card'; + card.dataset.filePath = recipe.file_path; + card.dataset.title = recipe.title; + card.dataset.created = recipe.created_date; + + // Get base model from first lora if available + const baseModel = recipe.loras && recipe.loras.length > 0 + ? recipe.loras[0].baseModel + : ''; + + card.innerHTML = ` +
+ Scanning and building recipe cache. This may take a few moments...
+