diff --git a/py/utils/example_images_download_manager.py b/py/utils/example_images_download_manager.py index 1c0f0866..973797dd 100644 --- a/py/utils/example_images_download_manager.py +++ b/py/utils/example_images_download_manager.py @@ -265,16 +265,7 @@ class DownloadManager: # Save final progress to file try: - progress_file = os.path.join(output_dir, '.download_progress.json') - with open(progress_file, 'w', encoding='utf-8') as f: - json.dump({ - 'processed_models': list(download_progress['processed_models']), - 'refreshed_models': list(download_progress['refreshed_models']), - 'completed': download_progress['completed'], - 'total': download_progress['total'], - 'last_update': time.time(), - 'status': download_progress['status'] - }, f, indent=2) + DownloadManager._save_progress(output_dir) except Exception as e: logger.error(f"Failed to save progress file: {e}") @@ -377,13 +368,32 @@ class DownloadManager: global download_progress try: progress_file = os.path.join(output_dir, '.download_progress.json') + + # Read existing progress file if it exists + existing_data = {} + if os.path.exists(progress_file): + try: + with open(progress_file, 'r', encoding='utf-8') as f: + existing_data = json.load(f) + except Exception as e: + logger.warning(f"Failed to read existing progress file: {e}") + + # Create new progress data + progress_data = { + 'processed_models': list(download_progress['processed_models']), + 'refreshed_models': list(download_progress['refreshed_models']), + 'completed': download_progress['completed'], + 'total': download_progress['total'], + 'last_update': time.time() + } + + # Preserve existing fields (especially naming_version) + for key, value in existing_data.items(): + if key not in progress_data: + progress_data[key] = value + + # Write updated progress data with open(progress_file, 'w', encoding='utf-8') as f: - json.dump({ - 'processed_models': list(download_progress['processed_models']), - 'refreshed_models': list(download_progress['refreshed_models']), - 'completed': download_progress['completed'], - 'total': download_progress['total'], - 'last_update': time.time() - }, f, indent=2) + json.dump(progress_data, f, indent=2) except Exception as e: logger.error(f"Failed to save progress file: {e}") \ No newline at end of file diff --git a/py/utils/example_images_metadata.py b/py/utils/example_images_metadata.py index 020b0b77..496d5ad0 100644 --- a/py/utils/example_images_metadata.py +++ b/py/utils/example_images_metadata.py @@ -1,8 +1,11 @@ import logging import os +import re from ..utils.metadata_manager import MetadataManager from ..utils.routes_common import ModelRouteUtils from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS +from ..utils.exif_utils import ExifUtils +from ..recipes.constants import GEN_PARAM_KEYS logger = logging.getLogger(__name__) @@ -233,6 +236,24 @@ class MetadataUpdater: "hasPositivePrompt": False } + # Extract and parse metadata if this is an image + if not is_video: + try: + # Extract metadata from image + extracted_metadata = ExifUtils.extract_image_metadata(path) + + if extracted_metadata: + # Parse the extracted metadata to get generation parameters + parsed_meta = MetadataUpdater._parse_image_metadata(extracted_metadata) + + if parsed_meta: + image_entry["meta"] = parsed_meta + image_entry["hasMeta"] = True + image_entry["hasPositivePrompt"] = bool(parsed_meta.get("prompt", "")) + logger.debug(f"Extracted metadata from {os.path.basename(path)}") + except Exception as e: + logger.warning(f"Failed to extract metadata from {os.path.basename(path)}: {e}") + # If it's an image, try to get actual dimensions try: from PIL import Image @@ -272,4 +293,98 @@ class MetadataUpdater: except Exception as e: logger.error(f"Failed to update metadata after import: {e}", exc_info=True) - return [], [] \ No newline at end of file + return [], [] + + @staticmethod + def _parse_image_metadata(user_comment): + """Parse metadata from image to extract generation parameters + + Args: + user_comment: Metadata string extracted from image + + Returns: + dict: Parsed metadata with generation parameters + """ + if not user_comment: + return None + + try: + # Initialize metadata dictionary + metadata = {} + + # Split on Negative prompt if it exists + if "Negative prompt:" in user_comment: + parts = user_comment.split('Negative prompt:', 1) + prompt = parts[0].strip() + negative_and_params = parts[1] if len(parts) > 1 else "" + else: + # No negative prompt section + param_start = re.search(r'Steps: \d+', user_comment) + if param_start: + prompt = user_comment[:param_start.start()].strip() + negative_and_params = user_comment[param_start.start():] + else: + prompt = user_comment.strip() + negative_and_params = "" + + # Add prompt if it's in GEN_PARAM_KEYS + if 'prompt' in GEN_PARAM_KEYS: + metadata['prompt'] = prompt + + # Extract negative prompt and parameters + if negative_and_params: + # If we split on "Negative prompt:", check for params section + if "Negative prompt:" in user_comment: + param_start = re.search(r'Steps: ', negative_and_params) + if param_start: + neg_prompt = negative_and_params[:param_start.start()].strip() + if 'negative_prompt' in GEN_PARAM_KEYS: + metadata['negative_prompt'] = neg_prompt + params_section = negative_and_params[param_start.start():] + else: + if 'negative_prompt' in GEN_PARAM_KEYS: + metadata['negative_prompt'] = negative_and_params.strip() + params_section = "" + else: + # No negative prompt, entire section is params + params_section = negative_and_params + + # Extract generation parameters + if params_section: + # Extract basic parameters + param_pattern = r'([A-Za-z\s]+): ([^,]+)' + params = re.findall(param_pattern, params_section) + + for key, value in params: + clean_key = key.strip().lower().replace(' ', '_') + + # Skip if not in recognized gen param keys + if clean_key not in GEN_PARAM_KEYS: + continue + + # Convert numeric values + if clean_key in ['steps', 'seed']: + try: + metadata[clean_key] = int(value.strip()) + except ValueError: + metadata[clean_key] = value.strip() + elif clean_key in ['cfg_scale']: + try: + metadata[clean_key] = float(value.strip()) + except ValueError: + metadata[clean_key] = value.strip() + else: + metadata[clean_key] = value.strip() + + # Extract size if available and add if a recognized key + size_match = re.search(r'Size: (\d+)x(\d+)', params_section) + if size_match and 'size' in GEN_PARAM_KEYS: + width, height = size_match.groups() + metadata['size'] = f"{width}x{height}" + + # Return metadata if we have any entries + return metadata if metadata else None + + except Exception as e: + logger.error(f"Error parsing image metadata: {e}", exc_info=True) + return None \ No newline at end of file diff --git a/py/utils/example_images_processor.py b/py/utils/example_images_processor.py index 240d366a..1110e857 100644 --- a/py/utils/example_images_processor.py +++ b/py/utils/example_images_processor.py @@ -9,6 +9,7 @@ from ..utils.constants import SUPPORTED_MEDIA_EXTENSIONS from ..services.service_registry import ServiceRegistry from ..services.settings_manager import settings from .example_images_metadata import MetadataUpdater +from ..utils.metadata_manager import MetadataManager logger = logging.getLogger(__name__) @@ -463,9 +464,8 @@ class ExampleImagesProcessor: model_copy.pop('folder', None) # Write metadata to file - from ..utils.metadata_manager import MetadataManager await MetadataManager.save_metadata(file_path, model_copy) - logger.info(f"Saved updated metadata for {model_data.get('model_name')}") + logger.debug(f"Saved updated metadata for {model_data.get('model_name')}") except Exception as e: logger.error(f"Failed to save metadata: {str(e)}") return web.json_response({ diff --git a/py/utils/file_utils.py b/py/utils/file_utils.py index 359cfc34..681d1b6f 100644 --- a/py/utils/file_utils.py +++ b/py/utils/file_utils.py @@ -63,193 +63,4 @@ def find_preview_file(base_name: str, dir_path: str) -> str: def normalize_path(path: str) -> str: """Normalize file path to use forward slashes""" - return path.replace(os.sep, "/") if path else path - -async def get_file_info(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> Optional[BaseModelMetadata]: - """Get basic file information as a model metadata object""" - # First check if file actually exists and resolve symlinks - try: - real_path = os.path.realpath(file_path) - if not os.path.exists(real_path): - return None - except Exception as e: - logger.error(f"Error checking file existence for {file_path}: {e}") - return None - - base_name = os.path.splitext(os.path.basename(file_path))[0] - dir_path = os.path.dirname(file_path) - - preview_url = find_preview_file(base_name, dir_path) - - # Check if a .json file exists with SHA256 hash to avoid recalculation - json_path = f"{os.path.splitext(file_path)[0]}.json" - sha256 = None - if os.path.exists(json_path): - try: - with open(json_path, 'r', encoding='utf-8') as f: - json_data = json.load(f) - if 'sha256' in json_data: - sha256 = json_data['sha256'].lower() - logger.debug(f"Using SHA256 from .json file for {file_path}") - except Exception as e: - logger.error(f"Error reading .json file for {file_path}: {e}") - - # If SHA256 is still not found, check for a .sha256 file - if sha256 is None: - sha256_file = f"{os.path.splitext(file_path)[0]}.sha256" - if os.path.exists(sha256_file): - try: - with open(sha256_file, 'r', encoding='utf-8') as f: - sha256 = f.read().strip().lower() - logger.debug(f"Using SHA256 from .sha256 file for {file_path}") - except Exception as e: - logger.error(f"Error reading .sha256 file for {file_path}: {e}") - - try: - # If we didn't get SHA256 from the .json file, calculate it - if not sha256: - start_time = time.time() - sha256 = await calculate_sha256(real_path) - logger.debug(f"Calculated SHA256 for {file_path} in {time.time() - start_time:.2f} seconds") - - # Create default metadata based on model class - if model_class == CheckpointMetadata: - metadata = CheckpointMetadata( - file_name=base_name, - model_name=base_name, - file_path=normalize_path(file_path), - size=os.path.getsize(real_path), - modified=os.path.getmtime(real_path), - sha256=sha256, - base_model="Unknown", # Will be updated later - preview_url=normalize_path(preview_url), - tags=[], - modelDescription="", - model_type="checkpoint" - ) - - # Extract checkpoint-specific metadata - # model_info = await extract_checkpoint_metadata(real_path) - # metadata.base_model = model_info['base_model'] - # if 'model_type' in model_info: - # metadata.model_type = model_info['model_type'] - - else: # Default to LoraMetadata - metadata = LoraMetadata( - file_name=base_name, - model_name=base_name, - file_path=normalize_path(file_path), - size=os.path.getsize(real_path), - modified=os.path.getmtime(real_path), - sha256=sha256, - base_model="Unknown", # Will be updated later - usage_tips="{}", - preview_url=normalize_path(preview_url), - tags=[], - modelDescription="" - ) - - # Extract lora-specific metadata - model_info = await extract_lora_metadata(real_path) - metadata.base_model = model_info['base_model'] - - # Save metadata to file - await save_metadata(file_path, metadata) - - return metadata - except Exception as e: - logger.error(f"Error getting file info for {file_path}: {e}") - return None - -async def save_metadata(file_path: str, metadata: BaseModelMetadata) -> None: - """Save metadata to .metadata.json file""" - metadata_path = f"{os.path.splitext(file_path)[0]}.metadata.json" - try: - metadata_dict = metadata.to_dict() - metadata_dict['file_path'] = normalize_path(metadata_dict['file_path']) - metadata_dict['preview_url'] = normalize_path(metadata_dict['preview_url']) - - with open(metadata_path, 'w', encoding='utf-8') as f: - json.dump(metadata_dict, f, indent=2, ensure_ascii=False) - except Exception as e: - logger.error(f"Error saving metadata to {metadata_path}: {str(e)}") - -async def load_metadata(file_path: str, model_class: Type[BaseModelMetadata] = LoraMetadata) -> Optional[BaseModelMetadata]: - """Load metadata from .metadata.json file""" - metadata_path = f"{os.path.splitext(file_path)[0]}.metadata.json" - try: - if os.path.exists(metadata_path): - with open(metadata_path, 'r', encoding='utf-8') as f: - data = json.load(f) - - needs_update = False - - # Check and normalize base model name - normalized_base_model = determine_base_model(data['base_model']) - if data['base_model'] != normalized_base_model: - data['base_model'] = normalized_base_model - needs_update = True - - # Compare paths without extensions - stored_path_base = os.path.splitext(data['file_path'])[0] - current_path_base = os.path.splitext(normalize_path(file_path))[0] - if stored_path_base != current_path_base: - data['file_path'] = normalize_path(file_path) - needs_update = True - - # TODO: optimize preview image to webp format if not already done - preview_url = data.get('preview_url', '') - if not preview_url or not os.path.exists(preview_url): - base_name = os.path.splitext(os.path.basename(file_path))[0] - dir_path = os.path.dirname(file_path) - new_preview_url = normalize_path(find_preview_file(base_name, dir_path)) - if new_preview_url != preview_url: - data['preview_url'] = new_preview_url - needs_update = True - else: - if stored_path_base != current_path_base: - # If model location changed, update preview path by replacing old path with new path - preview_file = os.path.basename(preview_url) - new_preview_url = os.path.join(os.path.dirname(file_path), preview_file) - data['preview_url'] = normalize_path(new_preview_url) - needs_update = True - - # Ensure all fields are present - if 'tags' not in data: - data['tags'] = [] - needs_update = True - - if 'modelDescription' not in data: - data['modelDescription'] = "" - needs_update = True - - # For checkpoint metadata - if model_class == CheckpointMetadata and 'model_type' not in data: - data['model_type'] = "checkpoint" - needs_update = True - - # For lora metadata - if model_class == LoraMetadata and 'usage_tips' not in data: - data['usage_tips'] = "{}" - needs_update = True - - # Update preview_nsfw_level if needed - civitai_data = data.get('civitai', {}) - civitai_images = civitai_data.get('images', []) if civitai_data else [] - if (data.get('preview_url') and - data.get('preview_nsfw_level', 0) == 0 and - civitai_images and - civitai_images[0].get('nsfwLevel', 0) != 0): - data['preview_nsfw_level'] = civitai_images[0]['nsfwLevel'] - # TODO: write to metadata file - # needs_update = True - - if needs_update: - with open(metadata_path, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=2, ensure_ascii=False) - - return model_class.from_dict(data) - - except Exception as e: - logger.error(f"Error loading metadata from {metadata_path}: {str(e)}") - return None \ No newline at end of file + return path.replace(os.sep, "/") if path else path \ No newline at end of file