mirror of
https://github.com/justUmen/Bjornulf_custom_nodes.git
synced 2026-03-21 20:52:11 -03:00
0.66
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
# nodes.py
|
||||
|
||||
import requests
|
||||
import numpy as np
|
||||
import io
|
||||
@@ -10,10 +12,9 @@ import sys
|
||||
import random
|
||||
import re
|
||||
from typing import Dict, Any, List, Tuple
|
||||
|
||||
class Everything(str):
|
||||
def __ne__(self, __value: object) -> bool:
|
||||
return False
|
||||
from server import PromptServer
|
||||
from aiohttp import web
|
||||
import json
|
||||
|
||||
language_map = {
|
||||
"ar": "Arabic", "cs": "Czech", "de": "German", "en": "English",
|
||||
@@ -23,30 +24,47 @@ language_map = {
|
||||
"zh-cn": "Chinese"
|
||||
}
|
||||
|
||||
DEFAULT_CONFIG = {
|
||||
"url": "http://localhost:8020",
|
||||
"language": "English",
|
||||
"speaker_wav": "default"
|
||||
}
|
||||
|
||||
class Everything(str):
|
||||
def __ne__(self, __value: object) -> bool:
|
||||
return False
|
||||
|
||||
class XTTSConfig:
|
||||
@classmethod
|
||||
def INPUT_TYPES(cls):
|
||||
return {
|
||||
"required": {
|
||||
"TTS_url": ("STRING", {"default": "http://localhost:8020"}),
|
||||
"language": (list(language_map.values()), {"default": language_map["en"]}),
|
||||
"speaker_wav": ("STRING", {"default": "default"}),
|
||||
}
|
||||
}
|
||||
|
||||
RETURN_TYPES = ("TTS_URL", "TTS_LANGUAGE", "TTS_SPEAKER")
|
||||
RETURN_NAMES = ("TTS_URL", "TTS_LANGUAGE", "TTS_SPEAKER")
|
||||
FUNCTION = "configure_xtts"
|
||||
CATEGORY = "Bjornulf"
|
||||
|
||||
def configure_xtts(self, TTS_url, language, speaker_wav):
|
||||
return (TTS_url, language, speaker_wav)
|
||||
|
||||
@classmethod
|
||||
def IS_CHANGED(cls, TTS_url, language, speaker_wav) -> float:
|
||||
return 0.0
|
||||
|
||||
class TextToSpeech:
|
||||
@classmethod
|
||||
def INPUT_TYPES(cls) -> Dict[str, Any]:
|
||||
speakers_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "speakers")
|
||||
speaker_options = ["default_for_language"] # Add default option
|
||||
speaker_options.extend([
|
||||
os.path.relpath(os.path.join(root, file), speakers_dir)
|
||||
for root, _, files in os.walk(speakers_dir)
|
||||
for file in files if file.endswith(".wav")
|
||||
])
|
||||
|
||||
speaker_options = speaker_options or ["No WAV files found"]
|
||||
|
||||
return {
|
||||
"required": {
|
||||
"text": ("STRING", {"multiline": True}),
|
||||
"language": (list(language_map.values()), {
|
||||
"default": language_map["en"],
|
||||
"display": "dropdown"
|
||||
}),
|
||||
"speaker_wav": (speaker_options, {
|
||||
"default": "default_for_language",
|
||||
"display": "dropdown"
|
||||
}),
|
||||
"language": (list(language_map.values()), {"default": "English"}),
|
||||
"speaker_wav": ("STRING", {"default": "default"}),
|
||||
"autoplay": ("BOOLEAN", {"default": True}),
|
||||
"save_audio": ("BOOLEAN", {"default": True}),
|
||||
"overwrite": ("BOOLEAN", {"default": False}),
|
||||
@@ -54,6 +72,9 @@ class TextToSpeech:
|
||||
},
|
||||
"optional": {
|
||||
"connect_to_workflow": (Everything("*"), {"forceInput": True}),
|
||||
"TTS_URL": ("TTS_URL", {"forceInput": True}),
|
||||
"TTS_LANGUAGE": ("TTS_LANGUAGE", {"forceInput": True}),
|
||||
"TTS_SPEAKER": ("TTS_SPEAKER", {"forceInput": True}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,63 +91,10 @@ class TextToSpeech:
|
||||
def sanitize_text(text: str) -> str:
|
||||
return re.sub(r'[^\w\s-]', '', text).replace(' ', '_')[:50]
|
||||
|
||||
def get_default_speaker(self, language_code: str) -> str:
|
||||
speakers_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "speakers")
|
||||
lang_dir = os.path.join(speakers_dir, language_code)
|
||||
|
||||
# First try to find default.wav
|
||||
default_path = os.path.join(language_code, "default.wav")
|
||||
if os.path.exists(os.path.join(speakers_dir, default_path)):
|
||||
return default_path
|
||||
|
||||
# If default.wav doesn't exist, use the first .wav file in the language directory
|
||||
if os.path.exists(lang_dir):
|
||||
for file in os.listdir(lang_dir):
|
||||
if file.endswith(".wav"):
|
||||
return os.path.join(language_code, file)
|
||||
|
||||
# If no suitable file is found, return the first available .wav file
|
||||
for root, _, files in os.walk(speakers_dir):
|
||||
for file in files:
|
||||
if file.endswith(".wav"):
|
||||
return os.path.relpath(os.path.join(root, file), speakers_dir)
|
||||
|
||||
return "No WAV files found"
|
||||
|
||||
def generate_audio(self, text: str, language: str, autoplay: bool, seed: int,
|
||||
save_audio: bool, overwrite: bool, speaker_wav: str,
|
||||
connect_to_workflow: Any = None) -> Tuple[Dict[str, Any], str, str, float]:
|
||||
language_code = self.get_language_code(language)
|
||||
|
||||
# Handle default_for_language option
|
||||
if speaker_wav == "default_for_language":
|
||||
speaker_wav = self.get_default_speaker(language_code)
|
||||
|
||||
sanitized_text = self.sanitize_text(text)
|
||||
|
||||
save_path = os.path.join("Bjornulf_TTS", language, speaker_wav, f"{sanitized_text}.wav")
|
||||
full_path = os.path.abspath(save_path)
|
||||
os.makedirs(os.path.dirname(full_path), exist_ok=True)
|
||||
|
||||
if os.path.exists(full_path) and not overwrite:
|
||||
print(f"Using existing audio file: {full_path}")
|
||||
audio_data = self.load_audio_file(full_path)
|
||||
else:
|
||||
audio_data = self.create_new_audio(text, language_code, speaker_wav, seed)
|
||||
if save_audio:
|
||||
self.save_audio_file(audio_data, full_path)
|
||||
|
||||
audio_output, _, duration = self.process_audio_data(autoplay, audio_data, full_path if save_audio else None)
|
||||
return (audio_output, save_path, full_path, duration)
|
||||
|
||||
def create_new_audio(self, text: str, language_code: str, speaker_wav: str, seed: int) -> io.BytesIO:
|
||||
def create_new_audio(self, text: str, language_code: str, speaker_wav: str, seed: int, TTS_config: Dict) -> io.BytesIO:
|
||||
random.seed(seed)
|
||||
if speaker_wav == "No WAV files found":
|
||||
print("Error: No WAV files available for text-to-speech.")
|
||||
return io.BytesIO()
|
||||
|
||||
encoded_text = urllib.parse.quote(text)
|
||||
url = f"http://localhost:8020/tts_stream?language={language_code}&speaker_wav={speaker_wav}&text={encoded_text}"
|
||||
url = f"{TTS_config['url']}/tts_stream?language={language_code}&speaker_wav={speaker_wav}&text={encoded_text}"
|
||||
|
||||
try:
|
||||
response = requests.get(url, stream=True)
|
||||
@@ -194,4 +162,92 @@ class TextToSpeech:
|
||||
return audio_data
|
||||
except Exception as e:
|
||||
print(f"Error loading audio file: {e}")
|
||||
return io.BytesIO()
|
||||
return io.BytesIO()
|
||||
|
||||
def generate_audio(self, text: str, language: str, speaker_wav: str,
|
||||
autoplay: bool, seed: int, save_audio: bool, overwrite: bool,
|
||||
TTS_URL: str = None, TTS_LANGUAGE: str = None,
|
||||
TTS_SPEAKER: str = None, connect_to_workflow: Any = None) -> Tuple[Dict[str, Any], str, str, float]:
|
||||
|
||||
# Use provided config values or fallback to node parameters
|
||||
config = {
|
||||
"url": TTS_URL if TTS_URL is not None else DEFAULT_CONFIG["url"],
|
||||
"language": TTS_LANGUAGE if TTS_LANGUAGE is not None else language,
|
||||
"speaker_wav": TTS_SPEAKER if TTS_SPEAKER is not None else speaker_wav
|
||||
}
|
||||
|
||||
language_code = self.get_language_code(config["language"])
|
||||
speaker_wav = config["speaker_wav"]
|
||||
|
||||
sanitized_text = self.sanitize_text(text)
|
||||
save_path = os.path.join("Bjornulf_TTS", config["language"], speaker_wav, f"{sanitized_text}.wav")
|
||||
full_path = os.path.abspath(save_path)
|
||||
os.makedirs(os.path.dirname(full_path), exist_ok=True)
|
||||
|
||||
if os.path.exists(full_path) and not overwrite:
|
||||
print(f"Using existing audio file: {full_path}")
|
||||
audio_data = self.load_audio_file(full_path)
|
||||
else:
|
||||
audio_data = self.create_new_audio(text, language_code, speaker_wav, seed, config)
|
||||
if save_audio:
|
||||
self.save_audio_file(audio_data, full_path)
|
||||
|
||||
audio_output, _, duration = self.process_audio_data(autoplay, audio_data, full_path if save_audio else None)
|
||||
return (audio_output, save_path, full_path, duration)
|
||||
|
||||
# GET VOICE FROM TTS SERVER (Not good for now)
|
||||
# @PromptServer.instance.routes.post("/bjornulf_TTS_get_voices")
|
||||
# async def get_voices(request):
|
||||
# try:
|
||||
# data = await request.json()
|
||||
# TTS_url = data.get('url', 'http://localhost:8020')
|
||||
|
||||
# # Use requests instead of client_session
|
||||
# response = requests.get(f"{TTS_url}/speakers")
|
||||
# response.raise_for_status() # Raise an exception for bad status codes
|
||||
# voices = response.json()
|
||||
|
||||
# # Transform the response to just get the voice_ids
|
||||
# voice_ids = [voice["voice_id"] for voice in voices]
|
||||
# return web.json_response({"voices": voice_ids})
|
||||
# except requests.RequestException as e:
|
||||
# print(f"Error fetching voices: {str(e)}")
|
||||
# return web.json_response({"error": str(e)}, status=500)
|
||||
# except Exception as e:
|
||||
# print(f"Unexpected error: {str(e)}")
|
||||
# return web.json_response({"error": str(e)}, status=500)
|
||||
|
||||
# Scan folder
|
||||
@PromptServer.instance.routes.post("/bjornulf_TTS_get_voices")
|
||||
async def get_voices(request):
|
||||
try:
|
||||
base_path = os.path.join("custom_nodes", "Bjornulf_custom_nodes", "speakers")
|
||||
voices_by_language = {}
|
||||
|
||||
# Scan each language directory
|
||||
for lang_code in language_map.keys():
|
||||
lang_path = os.path.join(base_path, lang_code)
|
||||
|
||||
if os.path.exists(lang_path):
|
||||
# List all .wav files in the language directory
|
||||
voice_ids = []
|
||||
for file in os.listdir(lang_path):
|
||||
if file.endswith('.wav'):
|
||||
voice_id = os.path.splitext(file)[0]
|
||||
# Include language code in voice ID
|
||||
full_voice_id = f"{lang_code}/{voice_id}"
|
||||
voice_ids.append(full_voice_id)
|
||||
|
||||
if voice_ids: # Only add languages that have voices
|
||||
voices_by_language[lang_code] = {
|
||||
"name": language_map[lang_code],
|
||||
"voices": voice_ids
|
||||
}
|
||||
|
||||
if not voices_by_language:
|
||||
return web.json_response({"error": "No voice files found"}, status=404)
|
||||
|
||||
return web.json_response({"languages": voices_by_language})
|
||||
except Exception as e:
|
||||
print(f"Unexpected error: {str(e)}")
|
||||
return web.json_response({"error": str(e)}, status=500)
|
||||
Reference in New Issue
Block a user