refactor(autocomplete): remove old CSV fallback, use TagFTSIndex exclusively

Remove all autocomplete.txt parsing logic and fallback code, simplifying
the service to use only TagFTSIndex for Danbooru/e621 tag search
with category filtering.

- Remove WordEntry dataclass and _words_cache, _file_path attributes
- Remove _determine_file_path(), get_file_path(), load_words(), save_words(),
  get_content(), _parse_csv_content() methods
- Simplify search_words() to only use TagFTSIndex, always returning
  enriched results with {tag_name, category, post_count}
- Remove GET/POST /api/lm/custom-words endpoints (unused)
- Keep GET /api/lm/custom-words/search for frontend autocomplete
- Rewrite tests to focus on TagFTSIndex integration

This reduces code by 446 lines and removes untested pysssss plugin
integration. Feature is unreleased so no backward compatibility needed.
This commit is contained in:
Will Miao
2026-01-26 20:36:00 +08:00
parent 31d94d7ea2
commit 7249c9fd4b
4 changed files with 110 additions and 447 deletions

View File

@@ -1,44 +1,25 @@
"""Service for managing custom autocomplete words.
"""Service for managing autocomplete via TagFTSIndex.
This service provides functionality to parse CSV-formatted custom words,
search them with priority-based ranking, and manage storage.
It also integrates with TagFTSIndex to search the Danbooru/e621 tag database
for comprehensive autocomplete suggestions with category filtering.
This service provides full-text search capabilities for Danbooru/e621 tags
with category filtering and enriched results including post counts.
"""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class WordEntry:
"""Represents a single custom word entry."""
text: str
priority: Optional[int] = None
value: Optional[str] = None
def get_insert_text(self) -> str:
"""Get the text to insert when this word is selected."""
return self.value if self.value is not None else self.text
class CustomWordsService:
"""Service for managing custom autocomplete words.
"""Service for autocomplete via TagFTSIndex.
This service:
- Loads custom words from CSV files (sharing with pysssss plugin)
- Parses CSV format: word[,priority] or word[,alias][,priority]
- Searches words with priority-based ranking
- Caches parsed words for performance
- Integrates with TagFTSIndex for Danbooru/e621 tag search
- Uses TagFTSIndex for fast full-text search of Danbooru/e621 tags
- Supports category-based filtering
- Returns enriched results with category and post_count
- Provides sub-100ms search times for 221k+ tags
"""
_instance: Optional[CustomWordsService] = None
@@ -53,13 +34,9 @@ class CustomWordsService:
if self._initialized:
return
self._words_cache: Dict[str, WordEntry] = {}
self._file_path: Optional[Path] = None
self._tag_index: Optional[Any] = None # Lazy-loaded TagFTSIndex
self._tag_index: Optional[Any] = None
self._initialized = True
self._determine_file_path()
@classmethod
def get_instance(cls) -> CustomWordsService:
"""Get the singleton instance of CustomWordsService."""
@@ -67,42 +44,6 @@ class CustomWordsService:
cls._instance = cls()
return cls._instance
def _determine_file_path(self) -> None:
"""Determine file path for custom words.
Priority order:
1. pysssss plugin's user/autocomplete.txt (if exists)
2. Lora Manager's user directory
"""
try:
import folder_paths # type: ignore
comfy_dir = Path(folder_paths.base_path)
except (ImportError, AttributeError):
# Fallback: compute from __file__
comfy_dir = Path(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
pysssss_user_dir = comfy_dir / "custom_nodes" / "comfyui-custom-scripts" / "user"
if pysssss_user_dir.exists():
pysssss_file = pysssss_user_dir / "autocomplete.txt"
if pysssss_file.exists():
self._file_path = pysssss_file
logger.info(f"Using pysssss custom words file: {pysssss_file}")
return
# Fallback to Lora Manager's user directory
from .settings_manager import get_settings_manager
settings_manager = get_settings_manager()
lm_user_dir = Path(settings_manager._get_user_config_directory())
lm_user_dir.mkdir(parents=True, exist_ok=True)
self._file_path = lm_user_dir / "autocomplete.txt"
logger.info(f"Using Lora Manager custom words file: {self._file_path}")
def get_file_path(self) -> Optional[Path]:
"""Get the current file path for custom words."""
return self._file_path
def _get_tag_index(self):
"""Get or create the TagFTSIndex instance (lazy initialization)."""
if self._tag_index is None:
@@ -114,198 +55,32 @@ class CustomWordsService:
self._tag_index = None
return self._tag_index
def load_words(self) -> Dict[str, WordEntry]:
"""Load and parse words from the custom words file.
Returns:
Dictionary mapping text to WordEntry objects.
"""
if self._file_path is None or not self._file_path.exists():
self._words_cache = {}
return self._words_cache
try:
content = self._file_path.read_text(encoding="utf-8")
self._words_cache = self._parse_csv_content(content)
logger.debug(f"Loaded {len(self._words_cache)} custom words")
except Exception as e:
logger.error(f"Error loading custom words: {e}", exc_info=True)
self._words_cache = {}
return self._words_cache
def _parse_csv_content(self, content: str) -> Dict[str, WordEntry]:
"""Parse CSV content into word entries.
Supported formats:
- word
- word,priority
Args:
content: CSV-formatted string with one word per line.
Returns:
Dictionary mapping text to WordEntry objects.
"""
words: Dict[str, WordEntry] = {}
for line in content.splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split(",")
parts = [p.strip() for p in parts if p.strip()]
if not parts:
continue
text = parts[0]
priority = None
value = None
if len(parts) == 2:
try:
priority = int(parts[1])
except ValueError:
# Not a priority, could be alias or unknown format
pass
if text:
words[text] = WordEntry(text=text, priority=priority, value=value)
return words
def search_words(
self,
search_term: str,
limit: int = 20,
categories: Optional[List[int]] = None,
enriched: bool = False
) -> Union[List[str], List[Dict[str, Any]]]:
"""Search custom words with priority-based ranking.
When categories are provided or enriched is True, uses TagFTSIndex to search
the Danbooru/e621 tag database and returns enriched results with category
and post_count.
Matching priority (for custom words):
1. Words with priority (sorted by priority descending)
2. Prefix matches (word starts with search term)
3. Include matches (word contains search term)
) -> List[Dict[str, Any]]:
"""Search tags using TagFTSIndex with category filtering.
Args:
search_term: The search term to match against.
limit: Maximum number of results to return.
categories: Optional list of category IDs to filter by.
When provided, searches TagFTSIndex instead of custom words.
enriched: If True, return enriched results even without category filtering.
enriched: If True, always return enriched results with category
and post_count (default behavior now).
Returns:
List of matching word texts (when categories is None and enriched is False), or
List of dicts with tag_name, category, post_count (when categories is provided
or enriched is True).
List of dicts with tag_name, category, and post_count.
"""
# Use TagFTSIndex when categories are specified or when explicitly requested
tag_index = self._get_tag_index()
if tag_index is not None:
# Search the tag database
results = tag_index.search(search_term, categories=categories, limit=limit)
if results:
# If categories were specified or enriched requested, return enriched results
if categories is not None or enriched:
return results
# Otherwise, convert to simple string list for backward compatibility
return [r["tag_name"] for r in results]
# Fall through to custom words if no tag results
return results
# Fall back to custom words search
words = self._words_cache if self._words_cache else self.load_words()
if not search_term:
term_lower = ""
else:
term_lower = search_term.lower()
priority_matches = []
prefix_matches = []
include_matches = []
for text, entry in words.items():
text_lower = text.lower()
pos = text_lower.find(term_lower)
if pos == -1:
continue
if entry.priority is not None:
priority_matches.append((entry, pos))
elif pos == 0:
prefix_matches.append((entry, pos))
else:
include_matches.append((entry, pos))
# Sort priority matches: by priority desc, then by length asc, then alphabetically
priority_matches.sort(
key=lambda x: (-x[0].priority if x[0].priority else 0, len(x[0].text), x[0].text)
)
# Sort prefix and include matches by position, then length, then alphabetically
prefix_matches.sort(key=lambda x: (x[1], len(x[0].text), x[0].text))
include_matches.sort(key=lambda x: (x[1], len(x[0].text), x[0].text))
# Combine results: 20% top priority + all prefix matches + rest of priority + all include
top_priority_count = max(1, limit // 5)
text_results = (
[entry.text for entry, _ in priority_matches[:top_priority_count]]
+ [entry.text for entry, _ in prefix_matches]
+ [entry.text for entry, _ in priority_matches[top_priority_count:]]
+ [entry.text for entry, _ in include_matches]
)
# If categories were requested but tag index failed, return empty enriched format
if categories is not None:
return [{"tag_name": t, "category": 0, "post_count": 0} for t in text_results[:limit]]
return text_results[:limit]
def save_words(self, content: str) -> bool:
"""Save custom words content to file.
Args:
content: CSV-formatted content to save.
Returns:
True if save was successful, False otherwise.
"""
if self._file_path is None:
logger.error("No file path configured for custom words")
return False
try:
self._file_path.write_text(content, encoding="utf-8")
self._words_cache = self._parse_csv_content(content)
logger.info(f"Saved {len(self._words_cache)} custom words")
return True
except Exception as e:
logger.error(f"Error saving custom words: {e}", exc_info=True)
return False
def get_content(self) -> str:
"""Get the raw content of the custom words file.
Returns:
The file content as a string, or empty string if file doesn't exist.
"""
if self._file_path is None or not self._file_path.exists():
return ""
try:
return self._file_path.read_text(encoding="utf-8")
except Exception as e:
logger.error(f"Error reading custom words file: {e}", exc_info=True)
return ""
logger.debug("TagFTSIndex not available, returning empty results")
return []
def get_custom_words_service() -> CustomWordsService:
@@ -313,4 +88,4 @@ def get_custom_words_service() -> CustomWordsService:
return CustomWordsService.get_instance()
__all__ = ["CustomWordsService", "WordEntry", "get_custom_words_service"]
__all__ = ["CustomWordsService", "get_custom_words_service"]