feat: enhance auto-organize functionality with empty directory cleanup and progress reporting

This commit is contained in:
Will Miao
2025-08-13 10:36:31 +08:00
parent a920921570
commit 5a0b3470f1
2 changed files with 104 additions and 50 deletions

View File

@@ -852,7 +852,7 @@ class BaseModelRoutes(ABC):
current_dir = os.path.dirname(file_path) current_dir = os.path.dirname(file_path)
# Skip if already in correct location # Skip if already in correct location
if os.path.normpath(current_dir) == os.path.normpath(target_dir): if current_dir.replace(os.sep, '/') == target_dir.replace(os.sep, '/'):
skipped_count += 1 skipped_count += 1
processed += 1 processed += 1
continue continue
@@ -914,6 +914,25 @@ class BaseModelRoutes(ABC):
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
# Send completion message # Send completion message
await ws_manager.broadcast({
'type': 'auto_organize_progress',
'status': 'cleaning',
'total': total_models,
'processed': processed,
'success': success_count,
'failures': failure_count,
'skipped': skipped_count,
'message': 'Cleaning up empty directories...'
})
# Clean up empty directories after organizing
from ..utils.utils import remove_empty_dirs
cleanup_counts = {}
for root in model_roots:
removed = remove_empty_dirs(root)
cleanup_counts[root] = removed
# Send cleanup completed message
await ws_manager.broadcast({ await ws_manager.broadcast({
'type': 'auto_organize_progress', 'type': 'auto_organize_progress',
'status': 'completed', 'status': 'completed',
@@ -921,7 +940,8 @@ class BaseModelRoutes(ABC):
'processed': processed, 'processed': processed,
'success': success_count, 'success': success_count,
'failures': failure_count, 'failures': failure_count,
'skipped': skipped_count 'skipped': skipped_count,
'cleanup': cleanup_counts
}) })
# Prepare response with limited details # Prepare response with limited details
@@ -933,7 +953,8 @@ class BaseModelRoutes(ABC):
'success': success_count, 'success': success_count,
'skipped': skipped_count, 'skipped': skipped_count,
'failures': failure_count, 'failures': failure_count,
'organization_type': 'flat' if is_flat_structure else 'structured' 'organization_type': 'flat' if is_flat_structure else 'structured',
'cleaned_dirs': cleanup_counts
} }
} }

View File

@@ -133,54 +133,87 @@ def calculate_recipe_fingerprint(loras):
return fingerprint return fingerprint
def calculate_relative_path_for_model(model_data: Dict) -> str: def calculate_relative_path_for_model(model_data: Dict) -> str:
"""Calculate relative path for existing model using template from settings """Calculate relative path for existing model using template from settings
Args:
model_data: Model data from scanner cache
Returns:
Relative path string (empty string for flat structure)
"""
# Get path template from settings, default to '{base_model}/{first_tag}'
path_template = settings.get('download_path_template', '{base_model}/{first_tag}')
# If template is empty, return empty path (flat structure)
if not path_template:
return ''
# Get base model name from model metadata
civitai_data = model_data.get('civitai', {})
# For CivitAI models, prefer civitai data; for non-CivitAI models, use model_data directly
if civitai_data:
base_model = civitai_data.get('baseModel', '')
else:
# Fallback to model_data fields for non-CivitAI models
base_model = model_data.get('base_model', '')
model_tags = model_data.get('tags', []) Args:
model_data: Model data from scanner cache
# Apply mapping if available
base_model_mappings = settings.get('base_model_path_mappings', {})
mapped_base_model = base_model_mappings.get(base_model, base_model)
# Find the first Civitai model tag that exists in model_tags
first_tag = ''
for civitai_tag in CIVITAI_MODEL_TAGS:
if civitai_tag in model_tags:
first_tag = civitai_tag
break
# If no Civitai model tag found, fallback to first tag
if not first_tag and model_tags:
first_tag = model_tags[0]
if not first_tag: Returns:
first_tag = 'no tags' # Default if no tags available Relative path string (empty string for flat structure)
"""
# Get path template from settings, default to '{base_model}/{first_tag}'
path_template = settings.get('download_path_template', '{base_model}/{first_tag}')
# If template is empty, return empty path (flat structure)
if not path_template:
return ''
# Get base model name from model metadata
civitai_data = model_data.get('civitai', {})
# For CivitAI models, prefer civitai data only if 'id' exists; for non-CivitAI models, use model_data directly
if civitai_data and civitai_data.get('id') is not None:
base_model = civitai_data.get('baseModel', '')
else:
# Fallback to model_data fields for non-CivitAI models
base_model = model_data.get('base_model', '')
model_tags = model_data.get('tags', [])
# Apply mapping if available
base_model_mappings = settings.get('base_model_path_mappings', {})
mapped_base_model = base_model_mappings.get(base_model, base_model)
# Find the first Civitai model tag that exists in model_tags
first_tag = ''
for civitai_tag in CIVITAI_MODEL_TAGS:
if civitai_tag in model_tags:
first_tag = civitai_tag
break
# If no Civitai model tag found, fallback to first tag
if not first_tag and model_tags:
first_tag = model_tags[0]
if not first_tag:
first_tag = 'no tags' # Default if no tags available
# Format the template with available data
formatted_path = path_template
formatted_path = formatted_path.replace('{base_model}', mapped_base_model)
formatted_path = formatted_path.replace('{first_tag}', first_tag)
return formatted_path
def remove_empty_dirs(path):
"""Recursively remove empty directories starting from the given path.
Args:
path (str): Root directory to start cleaning from
# Format the template with available data Returns:
formatted_path = path_template int: Number of empty directories removed
formatted_path = formatted_path.replace('{base_model}', mapped_base_model) """
formatted_path = formatted_path.replace('{first_tag}', first_tag) removed_count = 0
if not os.path.isdir(path):
return removed_count
return formatted_path # List all files in directory
files = os.listdir(path)
# Process all subdirectories first
for file in files:
full_path = os.path.join(path, file)
if os.path.isdir(full_path):
removed_count += remove_empty_dirs(full_path)
# Check if directory is now empty (after processing subdirectories)
if not os.listdir(path):
try:
os.rmdir(path)
removed_count += 1
except OSError:
pass
return removed_count