mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-05-06 08:26:45 -03:00
feat(excluded-models): add excluded management view
This commit is contained in:
@@ -179,6 +179,57 @@ class BaseModelService(ABC):
|
||||
)
|
||||
return paginated
|
||||
|
||||
async def get_excluded_paginated_data(
|
||||
self,
|
||||
page: int,
|
||||
page_size: int,
|
||||
sort_by: str = "name",
|
||||
search: str = None,
|
||||
fuzzy_search: bool = False,
|
||||
search_options: dict = None,
|
||||
**kwargs,
|
||||
) -> Dict:
|
||||
"""Get paginated excluded model data."""
|
||||
excluded_paths = list(self.scanner.get_excluded_models())
|
||||
excluded_entries: List[Dict[str, Any]] = []
|
||||
stale_paths: List[str] = []
|
||||
|
||||
for file_path in excluded_paths:
|
||||
if not file_path or not os.path.exists(file_path):
|
||||
stale_paths.append(file_path)
|
||||
continue
|
||||
|
||||
entry = await self._build_excluded_entry(file_path)
|
||||
if entry:
|
||||
excluded_entries.append(entry)
|
||||
else:
|
||||
stale_paths.append(file_path)
|
||||
|
||||
if stale_paths:
|
||||
current_excluded = getattr(self.scanner, "_excluded_models", None)
|
||||
if isinstance(current_excluded, list):
|
||||
stale_set = set(stale_paths)
|
||||
self.scanner._excluded_models = [
|
||||
path for path in current_excluded if path not in stale_set
|
||||
]
|
||||
persist_current_cache = getattr(self.scanner, "_persist_current_cache", None)
|
||||
if callable(persist_current_cache):
|
||||
await persist_current_cache()
|
||||
|
||||
excluded_entries = self._sort_entries(excluded_entries, sort_by)
|
||||
|
||||
if search:
|
||||
excluded_entries = await self._apply_search_filters(
|
||||
excluded_entries,
|
||||
search,
|
||||
fuzzy_search,
|
||||
search_options,
|
||||
)
|
||||
|
||||
paginated = self._paginate(excluded_entries, page, page_size)
|
||||
paginated["items"] = await self._annotate_update_flags(paginated["items"])
|
||||
return paginated
|
||||
|
||||
async def _fetch_with_usage_sort(self, sort_params):
|
||||
"""Fetch data sorted by usage count (desc/asc)."""
|
||||
cache = await self.cache_repository.get_cache()
|
||||
@@ -218,6 +269,62 @@ class BaseModelService(ABC):
|
||||
)
|
||||
return annotated
|
||||
|
||||
def _sort_entries(self, data: List[Dict[str, Any]], sort_by: str) -> List[Dict[str, Any]]:
|
||||
sort_params = self.cache_repository.parse_sort(sort_by)
|
||||
key_name = sort_params.key
|
||||
|
||||
if key_name == "date":
|
||||
key_fn = lambda item: (
|
||||
float(item.get("modified", 0.0) or 0.0),
|
||||
(item.get("model_name") or item.get("file_name") or "").lower(),
|
||||
item.get("file_path", "").lower(),
|
||||
)
|
||||
elif key_name == "size":
|
||||
key_fn = lambda item: (
|
||||
int(item.get("size", 0) or 0),
|
||||
(item.get("model_name") or item.get("file_name") or "").lower(),
|
||||
item.get("file_path", "").lower(),
|
||||
)
|
||||
elif key_name == "usage":
|
||||
key_fn = lambda item: (
|
||||
int(item.get("usage_count", 0) or 0),
|
||||
(item.get("model_name") or item.get("file_name") or "").lower(),
|
||||
item.get("file_path", "").lower(),
|
||||
)
|
||||
else:
|
||||
key_fn = lambda item: (
|
||||
(item.get("model_name") or item.get("file_name") or "").lower(),
|
||||
item.get("file_path", "").lower(),
|
||||
)
|
||||
|
||||
return sorted(data, key=key_fn, reverse=sort_params.order == "desc")
|
||||
|
||||
async def _build_excluded_entry(self, file_path: str) -> Optional[Dict[str, Any]]:
|
||||
root_path = self.scanner._find_root_for_file(file_path)
|
||||
if not root_path:
|
||||
return None
|
||||
|
||||
metadata, should_skip = await MetadataManager.load_metadata(
|
||||
file_path,
|
||||
self.metadata_class,
|
||||
)
|
||||
if should_skip:
|
||||
return None
|
||||
|
||||
if metadata is None:
|
||||
metadata = await self.scanner._create_default_metadata(file_path)
|
||||
if metadata is None:
|
||||
return None
|
||||
|
||||
metadata = self.scanner.adjust_metadata(metadata, file_path, root_path)
|
||||
folder = os.path.dirname(os.path.relpath(file_path, root_path)).replace(
|
||||
os.path.sep, "/"
|
||||
)
|
||||
entry = self.scanner._build_cache_entry(metadata, folder=folder)
|
||||
entry = self.scanner.adjust_cached_entry(entry)
|
||||
entry["exclude"] = True
|
||||
return entry
|
||||
|
||||
async def _apply_hash_filters(
|
||||
self, data: List[Dict], hash_filters: Dict
|
||||
) -> List[Dict]:
|
||||
|
||||
@@ -42,6 +42,7 @@ class CheckpointService(BaseModelService):
|
||||
"notes": checkpoint_data.get("notes", ""),
|
||||
"sub_type": sub_type,
|
||||
"favorite": checkpoint_data.get("favorite", False),
|
||||
"exclude": bool(checkpoint_data.get("exclude", False)),
|
||||
"update_available": bool(checkpoint_data.get("update_available", False)),
|
||||
"skip_metadata_refresh": bool(checkpoint_data.get("skip_metadata_refresh", False)),
|
||||
"civitai": self.filter_civitai_data(checkpoint_data.get("civitai", {}), minimal=True)
|
||||
|
||||
@@ -42,6 +42,7 @@ class EmbeddingService(BaseModelService):
|
||||
"notes": embedding_data.get("notes", ""),
|
||||
"sub_type": sub_type,
|
||||
"favorite": embedding_data.get("favorite", False),
|
||||
"exclude": bool(embedding_data.get("exclude", False)),
|
||||
"update_available": bool(embedding_data.get("update_available", False)),
|
||||
"skip_metadata_refresh": bool(embedding_data.get("skip_metadata_refresh", False)),
|
||||
"civitai": self.filter_civitai_data(embedding_data.get("civitai", {}), minimal=True)
|
||||
|
||||
@@ -48,6 +48,7 @@ class LoraService(BaseModelService):
|
||||
"usage_tips": lora_data.get("usage_tips", ""),
|
||||
"notes": lora_data.get("notes", ""),
|
||||
"favorite": lora_data.get("favorite", False),
|
||||
"exclude": bool(lora_data.get("exclude", False)),
|
||||
"update_available": bool(lora_data.get("update_available", False)),
|
||||
"skip_metadata_refresh": bool(
|
||||
lora_data.get("skip_metadata_refresh", False)
|
||||
|
||||
@@ -8,6 +8,7 @@ from typing import Any, Awaitable, Callable, Dict, Iterable, List, Mapping, Opti
|
||||
|
||||
from ..services.service_registry import ServiceRegistry
|
||||
from ..utils.constants import PREVIEW_EXTENSIONS
|
||||
from ..utils.metadata_manager import MetadataManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -207,11 +208,56 @@ class ModelLifecycleService:
|
||||
|
||||
excluded = getattr(self._scanner, "_excluded_models", None)
|
||||
if isinstance(excluded, list):
|
||||
excluded.append(file_path)
|
||||
if file_path not in excluded:
|
||||
excluded.append(file_path)
|
||||
|
||||
persist_current_cache = getattr(self._scanner, "_persist_current_cache", None)
|
||||
if callable(persist_current_cache):
|
||||
await persist_current_cache()
|
||||
|
||||
message = f"Model {os.path.basename(file_path)} excluded"
|
||||
return {"success": True, "message": message}
|
||||
|
||||
async def unexclude_model(self, file_path: str) -> Dict[str, object]:
|
||||
"""Restore a previously excluded model to the active cache."""
|
||||
|
||||
if not file_path:
|
||||
raise ValueError("Model path is required")
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
raise ValueError("Model file does not exist")
|
||||
|
||||
metadata_path = os.path.splitext(file_path)[0] + ".metadata.json"
|
||||
metadata_payload = await self._metadata_loader(metadata_path)
|
||||
metadata_payload["exclude"] = False
|
||||
|
||||
await self._metadata_manager.save_metadata(file_path, metadata_payload)
|
||||
|
||||
metadata, should_skip = await MetadataManager.load_metadata(
|
||||
file_path,
|
||||
self._scanner.model_class,
|
||||
)
|
||||
if should_skip:
|
||||
metadata = None
|
||||
if metadata is None:
|
||||
metadata = metadata_payload
|
||||
|
||||
excluded = getattr(self._scanner, "_excluded_models", None)
|
||||
if isinstance(excluded, list):
|
||||
self._scanner._excluded_models = [
|
||||
path for path in excluded if path != file_path
|
||||
]
|
||||
|
||||
await self._scanner.update_single_model_cache(
|
||||
file_path,
|
||||
file_path,
|
||||
metadata,
|
||||
recalculate_type=True,
|
||||
)
|
||||
|
||||
message = f"Model {os.path.basename(file_path)} restored"
|
||||
return {"success": True, "message": message}
|
||||
|
||||
async def bulk_delete_models(self, file_paths: Iterable[str]) -> Dict[str, object]:
|
||||
"""Delete a collection of models via the scanner bulk operation."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user