mirror of
https://github.com/willmiao/ComfyUI-Lora-Manager.git
synced 2026-05-06 08:26:45 -03:00
136 lines
4.4 KiB
Python
136 lines
4.4 KiB
Python
"""Service for managing autocomplete via TagFTSIndex.
|
|
|
|
This service provides full-text search capabilities for Danbooru/e621 tags
|
|
with category filtering and enriched results including post counts.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
_EMBEDDED_COMMAND_PATTERN = re.compile(r"\s/\w")
|
|
class CustomWordsService:
|
|
"""Service for autocomplete via TagFTSIndex.
|
|
|
|
This service:
|
|
- Uses TagFTSIndex for fast full-text search of Danbooru/e621 tags
|
|
- Supports category-based filtering
|
|
- Returns enriched results with category and post_count
|
|
- Provides sub-100ms search times for 221k+ tags
|
|
"""
|
|
|
|
_instance: Optional[CustomWordsService] = None
|
|
_initialized: bool = False
|
|
|
|
def __new__(cls) -> CustomWordsService:
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
return cls._instance
|
|
|
|
def __init__(self) -> None:
|
|
if self._initialized:
|
|
return
|
|
|
|
self._tag_index: Optional[Any] = None
|
|
self._initialized = True
|
|
|
|
@classmethod
|
|
def get_instance(cls) -> CustomWordsService:
|
|
"""Get the singleton instance of CustomWordsService."""
|
|
if cls._instance is None:
|
|
cls._instance = cls()
|
|
return cls._instance
|
|
|
|
def _get_tag_index(self):
|
|
"""Get or create the TagFTSIndex instance (lazy initialization)."""
|
|
if self._tag_index is None:
|
|
try:
|
|
from .tag_fts_index import get_tag_fts_index
|
|
|
|
self._tag_index = get_tag_fts_index()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to initialize TagFTSIndex: {e}")
|
|
self._tag_index = None
|
|
return self._tag_index
|
|
|
|
def search_words(
|
|
self,
|
|
search_term: str,
|
|
limit: int = 20,
|
|
offset: int = 0,
|
|
categories: Optional[List[int]] = None,
|
|
enriched: bool = False,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Search tags using TagFTSIndex with category filtering.
|
|
|
|
Args:
|
|
search_term: The search term to match against.
|
|
limit: Maximum number of results to return.
|
|
offset: Number of results to skip.
|
|
categories: Optional list of category IDs to filter by.
|
|
enriched: If True, always return enriched results with category
|
|
and post_count (default behavior now).
|
|
|
|
Returns:
|
|
List of dicts with tag_name, category, and post_count.
|
|
"""
|
|
normalized_search = search_term.strip()
|
|
if not normalized_search:
|
|
return []
|
|
|
|
# Prompt widgets should only send the active token, but guard against
|
|
# accidental full-prompt queries reaching the FTS path.
|
|
if (
|
|
"__" in normalized_search
|
|
or "," in normalized_search
|
|
or ">" in normalized_search
|
|
or "\n" in normalized_search
|
|
or "\r" in normalized_search
|
|
or _EMBEDDED_COMMAND_PATTERN.search(normalized_search)
|
|
):
|
|
logger.debug("Skipping prompt-like custom words query: %s", normalized_search)
|
|
return []
|
|
|
|
logger.info(
|
|
"LM custom words service start search=%r categories=%s limit=%s offset=%s enriched=%s",
|
|
normalized_search,
|
|
categories,
|
|
limit,
|
|
offset,
|
|
enriched,
|
|
)
|
|
|
|
tag_index = self._get_tag_index()
|
|
if tag_index is not None:
|
|
logger.info(
|
|
"LM custom words service tag_index ready=%s indexing=%s",
|
|
getattr(tag_index, "is_ready", lambda: "unknown")(),
|
|
getattr(tag_index, "is_indexing", lambda: "unknown")(),
|
|
)
|
|
|
|
results = tag_index.search(
|
|
normalized_search, categories=categories, limit=limit, offset=offset
|
|
)
|
|
logger.info(
|
|
"LM custom words service done search=%r result_count=%s",
|
|
normalized_search,
|
|
len(results),
|
|
)
|
|
return results
|
|
|
|
logger.debug("TagFTSIndex not available, returning empty results")
|
|
return []
|
|
|
|
|
|
def get_custom_words_service() -> CustomWordsService:
|
|
"""Factory function to get the CustomWordsService singleton."""
|
|
return CustomWordsService.get_instance()
|
|
|
|
|
|
__all__ = ["CustomWordsService", "get_custom_words_service"]
|