501 lines
16 KiB
Python
501 lines
16 KiB
Python
import asyncio
|
|
import json
|
|
import os
|
|
import random
|
|
import shutil
|
|
import threading
|
|
import time
|
|
|
|
### transcribe audio
|
|
import uuid
|
|
from contextlib import suppress
|
|
from pathlib import Path
|
|
from queue import Queue
|
|
from threading import Event, Thread
|
|
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from queue import Queue
|
|
from threading import Event, Thread
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import soundfile as sf
|
|
import yaml
|
|
from dotenv import load_dotenv
|
|
from faster_whisper import WhisperModel
|
|
from openai import OpenAI
|
|
from process.asr_func.asr_auto_record import record_on_speech, transcribe_audio
|
|
|
|
from process.asr_func.asr_transcribe_groq import record_on_speech, transcribe_audio_groq
|
|
|
|
from process.llm_funcs.llm_scr import llm_response, llm_response_with_memory
|
|
from process.tts_func.sovits_ping import (
|
|
get_wav_duration,
|
|
play_audio,
|
|
sovits_gen,
|
|
sovits_gen_emotion,
|
|
)
|
|
from process.tts_func.tts_preprocess import clean_llm_output
|
|
from process.vrm_func.vrm_ping import vrm_animate, vrm_talk
|
|
|
|
|
|
load_dotenv()
|
|
openai_api_key = os.getenv("OPENAI_API_KEY")
|
|
|
|
|
|
def get_wav_duration(path):
|
|
with sf.SoundFile(path) as f:
|
|
return len(f) / f.samplerate
|
|
|
|
|
|
print(" \n ========= Starting Chat... ================ \n")
|
|
|
|
|
|
#!/usr/bin/env python3
|
|
"""
|
|
Streaming LLM -> chunked TTS -> queued playback script for Riko.
|
|
|
|
What it does:
|
|
- Records user speech (uses your record_on_speech)
|
|
- Transcribes (transcribe_audio)
|
|
- Streams LLM text (OpenAI Responses streaming)
|
|
- For each chunk: generate TTS via sovits_gen_emotion, copy to public audio dir, enqueue for playback
|
|
- Playback loop calls vrm_talk and vrm_animate and waits for the audio's duration to avoid overlap
|
|
- At the end of the stream, the full assistant text is appended to the JSON history file
|
|
|
|
Fill in or import the helper functions you already have in your project:
|
|
- record_on_speech(output_file, samplerate, channels, silence_threshold, silence_duration, device)
|
|
- transcribe_audio(whisper_model, aud_path)
|
|
- clean_asr_output(text)
|
|
- sovits_gen_emotion(in_text, emotion, output_wav_pth) -> returns path to generated wav (must write to output_wav_pth)
|
|
- vrm_talk(public_audio_path, expression, llm_output, duration)
|
|
- vrm_animate(cmd, path)
|
|
- clean_llm_output(text)
|
|
- get_emotion(text, emotion_model, tokenizer)
|
|
- map_emotion_to_expression(emotion)
|
|
- get_wav_duration(path)
|
|
|
|
Make sure char_config.yaml contains history_file and model keys (see your example config).
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import time
|
|
import uuid
|
|
from contextlib import suppress
|
|
from pathlib import Path
|
|
from queue import Queue
|
|
from threading import Thread
|
|
import requests
|
|
import yaml
|
|
from openai import OpenAI
|
|
|
|
# ---------------------------
|
|
# Load config + OpenAI client
|
|
# ---------------------------
|
|
CONFIG_PATH = os.path.expanduser("~/riko_project_v1/character_config.yaml")
|
|
if not os.path.exists(CONFIG_PATH):
|
|
raise FileNotFoundError(f"Config not found at {CONFIG_PATH}")
|
|
|
|
with open(CONFIG_PATH, "r") as f:
|
|
char_config = yaml.safe_load(f)
|
|
|
|
HISTORY_FILE = char_config["history_file"]
|
|
MODEL = char_config.get("model", "gpt-4.1-mini")
|
|
# MODEL = 'gpt-4.1'
|
|
CASE_SYSTEM_PROMPT = "You are a helpful assistant." # override for testing.
|
|
|
|
SYSTEM_PROMPT = [
|
|
{
|
|
"role": "system",
|
|
"content": [
|
|
{
|
|
"type": "input_text",
|
|
"text": char_config["presets"]["default"]["system_prompt"],
|
|
}
|
|
# {"type": "input_text", "text": CASE_SYSTEM_PROMPT}
|
|
],
|
|
}
|
|
]
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
|
|
if not OPENAI_API_KEY:
|
|
raise EnvironmentError("Please set OPENAI_API_KEY in your environment")
|
|
|
|
client = OpenAI(api_key=OPENAI_API_KEY)
|
|
|
|
# ---------------------------
|
|
# History utilities
|
|
# ---------------------------
|
|
|
|
|
|
def load_history():
|
|
if os.path.exists(HISTORY_FILE):
|
|
with open(HISTORY_FILE, "r") as f:
|
|
return json.load(f)
|
|
return SYSTEM_PROMPT.copy()
|
|
|
|
|
|
def save_history(history):
|
|
with open(HISTORY_FILE, "w") as f:
|
|
json.dump(history, f, indent=2)
|
|
|
|
|
|
|
|
# ---------------------------
|
|
# State Manager
|
|
# ---------------------------
|
|
BASE_URL = "http://localhost:8000"
|
|
|
|
def set_vrm_state(state):
|
|
"""
|
|
Set the VRM avatar's animation state.
|
|
|
|
Valid states:
|
|
- idle: Avatar looks around naturally, with chance to look at user
|
|
- listening: Avatar focuses on user with occasional side glances
|
|
- thinking: Avatar looks away with eyes leading head movement
|
|
- talking: Avatar nods with variable intensity and head tilts
|
|
"""
|
|
url = f"{BASE_URL}/set_state"
|
|
payload = {"state": state}
|
|
resp = requests.post(url, json=payload)
|
|
print(f"[set_state] Status: {resp.status_code}, State: {state}")
|
|
return resp
|
|
|
|
# ---------------------------
|
|
# Streaming helper
|
|
# ---------------------------
|
|
|
|
|
|
def stream_text_chunks(messages, min_len=30, max_len=120):
|
|
"""Synchronous generator that yields text chunks as the model streams.
|
|
|
|
Uses the OpenAI Responses streaming client like your example.
|
|
"""
|
|
buffer = ""
|
|
|
|
with client.responses.stream(
|
|
model=MODEL,
|
|
input=messages,
|
|
temperature=1,
|
|
top_p=1,
|
|
max_output_tokens=2048,
|
|
) as stream:
|
|
for event in stream:
|
|
# event.type values: "response.output_text.delta", "response.output_text.done", ...
|
|
if event.type == "response.output_text.delta":
|
|
buffer += event.delta
|
|
|
|
# Rule 1: yield if buffer ends with punctuation AND is long enough
|
|
if buffer.endswith((".", "?", "!", "…")) and len(buffer) >= min_len:
|
|
yield buffer.strip()
|
|
buffer = ""
|
|
|
|
# Rule 2: force yield if buffer gets too long
|
|
elif len(buffer) >= max_len:
|
|
yield buffer.strip()
|
|
buffer = ""
|
|
|
|
elif event.type == "response.output_text.done":
|
|
# Final flush
|
|
if buffer.strip():
|
|
yield buffer.strip()
|
|
# After final flush we'll exit the context; final response object is available
|
|
|
|
final_response = stream.get_final_response()
|
|
# Optionally return final_response (can't return from generator).
|
|
|
|
|
|
# ---------------------------
|
|
# Playback worker (single-threaded sequential playback)
|
|
# ---------------------------
|
|
# ==================== Item Types ====================
|
|
class ItemType(Enum):
|
|
TEXT = "text"
|
|
FUNCTION_CALL = "function_call"
|
|
|
|
|
|
@dataclass
|
|
class PlaybackItem:
|
|
"""Item to be processed in the playback queue."""
|
|
|
|
item_type: ItemType
|
|
content: str # Text content or function name
|
|
arguments: Optional[Dict] = None # Function arguments
|
|
audio_path: Optional[Path] = None # Path to audio file for text items
|
|
expression: str = "relaxed"
|
|
duration: float = 0.0
|
|
needs_sync: bool = False # Whether function needs to be synced with TTS
|
|
|
|
|
|
# ==================== Playback Worker ====================
|
|
class OrderedPlaybackWorker:
|
|
"""
|
|
Worker that processes text and function calls in strict order.
|
|
Ensures audio doesn't overlap and functions execute at the right time.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.queue = Queue()
|
|
self.thread = Thread(target=self._run, daemon=True)
|
|
self._running = False
|
|
self._talking = False
|
|
self.queue_finished_event = Event() # NEW: Event to signal queue is empty
|
|
self.queue_finished_event.set() # Start as finished (no items)
|
|
|
|
def start(self):
|
|
if not self._running:
|
|
self._running = True
|
|
self.thread.start()
|
|
|
|
def enqueue(self, item: PlaybackItem):
|
|
"""Add an item to the playback queue."""
|
|
self.queue_finished_event.clear() # NEW: Mark queue as not finished
|
|
self.queue.put(item)
|
|
|
|
def wait_until_finished(self, timeout=None):
|
|
"""
|
|
Wait until the playback queue is completely empty and processed.
|
|
|
|
Args:
|
|
timeout: Maximum time to wait in seconds (None = wait forever)
|
|
|
|
Returns:
|
|
True if queue finished, False if timeout occurred
|
|
"""
|
|
return self.queue_finished_event.wait(timeout)
|
|
|
|
def _run(self):
|
|
"""Main playback loop - processes items in strict order."""
|
|
while True:
|
|
item = self.queue.get()
|
|
if item is None:
|
|
break
|
|
|
|
if item.item_type == ItemType.TEXT:
|
|
self._process_text_item(item)
|
|
elif item.item_type == ItemType.FUNCTION_CALL:
|
|
self._process_function_item(item)
|
|
|
|
# Return to idle if queue is empty
|
|
if self.queue.empty():
|
|
try:
|
|
idle_path = Path("animations/mixamo") / "Idle.fbx"
|
|
vrm_animate("start_mixamo", str(idle_path))
|
|
self._talking = False
|
|
except Exception as e:
|
|
print(f"vrm_animate (idle) failed: {e}")
|
|
|
|
# NEW: Signal that queue is finished
|
|
self.queue_finished_event.set()
|
|
|
|
def _process_text_item(self, item: PlaybackItem):
|
|
"""Process a text item - play TTS audio."""
|
|
try:
|
|
# Start talking animation if not already talking
|
|
if not self._talking:
|
|
anim_path = Path("animations/mixamo") / "Talking.fbx"
|
|
vrm_animate("start_mixamo", str(anim_path))
|
|
self._talking = True
|
|
except Exception as e:
|
|
print(f"vrm_animate (start talking) failed: {e}")
|
|
|
|
# Send audio to VRM
|
|
try:
|
|
vrm_talk(
|
|
str(item.audio_path), item.expression, item.content, int(item.duration)
|
|
)
|
|
except Exception as e:
|
|
print(f"vrm_talk failed: {e}")
|
|
|
|
# Wait for audio duration to prevent overlap
|
|
time.sleep(item.duration)
|
|
|
|
|
|
def stop(self):
|
|
self.queue.put(None)
|
|
self.thread.join()
|
|
|
|
# ---------------------------
|
|
# Utilities
|
|
# ---------------------------
|
|
|
|
|
|
def ensure_dirs():
|
|
Path("client/audio").mkdir(parents=True, exist_ok=True)
|
|
Path("audio").mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def copy_to_public(client_path: Path, public_path: Path):
|
|
# Copy the client audio file to the public folder that vrm_talk expects
|
|
shutil.copy2(client_path, public_path)
|
|
|
|
|
|
# Fallback duration reader (if you don't have one)
|
|
|
|
|
|
def fallback_get_wav_duration(p: Path):
|
|
try:
|
|
import contextlib
|
|
import wave
|
|
|
|
with contextlib.closing(wave.open(str(p), "r")) as wf:
|
|
frames = wf.getnframes()
|
|
rate = wf.getframerate()
|
|
return frames / float(rate)
|
|
except Exception:
|
|
return 3.0
|
|
|
|
|
|
# ---------------------------
|
|
# Main orchestration
|
|
# ---------------------------
|
|
|
|
|
|
def main_loop():
|
|
ensure_dirs()
|
|
|
|
playback = OrderedPlaybackWorker()
|
|
playback.start()
|
|
|
|
# Load any models or tokenizers you have for emotion detection here
|
|
# whisper_model, emotion_model, tokenizer = load_your_models()
|
|
|
|
while True:
|
|
try:
|
|
|
|
# NEW: Wait for playback queue to finish before starting microphone
|
|
print("\n⏳ Waiting for playback queue to finish...")
|
|
playback.wait_until_finished()
|
|
print("✅ Queue finished, ready for input")
|
|
|
|
# 1) Idle animation
|
|
try:
|
|
idle_anim = Path("animations/mixamo") / "Idle.fbx"
|
|
vrm_animate("start_mixamo", str(idle_anim))
|
|
set_vrm_state("idle")
|
|
except Exception:
|
|
pass
|
|
|
|
# cont = input(">>> ENTER TO CONTINUE")
|
|
|
|
conversation_recording = Path("audio") / "conversation.wav"
|
|
conversation_recording.parent.mkdir(parents=True, exist_ok=True)
|
|
conversation_recording = str(conversation_recording)
|
|
record_on_speech(
|
|
output_file=conversation_recording,
|
|
samplerate=44100,
|
|
channels=1,
|
|
silence_threshold=0.02, # Adjust based on your microphone sensitivity
|
|
silence_duration=2, # Stop after 3 seconds of silence
|
|
device=None, # Use default device, or specify by ID or name
|
|
)
|
|
|
|
# 3) Thinking animation
|
|
try:
|
|
thinking_anim = Path("animations/mixamo") / "Thinking.fbx"
|
|
set_vrm_state("thinking")
|
|
vrm_animate("start_mixamo", str(thinking_anim))
|
|
except Exception:
|
|
pass
|
|
|
|
# 4) Transcribe
|
|
user_spoken_text = transcribe_audio_groq(aud_path=conversation_recording)
|
|
|
|
# 5) Build messages history
|
|
messages = load_history()
|
|
messages.append(
|
|
{
|
|
"role": "user",
|
|
"content": [{"type": "input_text", "text": user_spoken_text}],
|
|
}
|
|
)
|
|
|
|
# 6) Stream model and generate TTS per chunk
|
|
print("[llm] streaming response...")
|
|
full_assistant_text = ""
|
|
set_vrm_state("talking")
|
|
for chunk in stream_text_chunks(messages):
|
|
print("[chunk]", chunk)
|
|
|
|
# accumulate final text
|
|
full_assistant_text += chunk + " "
|
|
|
|
# prepare TTS text and emotion
|
|
tts_read_text = clean_llm_output(chunk)
|
|
# emotion = get_emotion(chunk, None, None) # plug your emotion model/tokenizer
|
|
# expression = map_emotion_to_expression(emotion)
|
|
# temp implementation
|
|
emotion = "relaxed" # or "smug" etc.
|
|
expression = "relaxed"
|
|
|
|
# create unique filename for this chunk
|
|
uid = uuid.uuid4().hex
|
|
filename = f"output_{uid}.wav"
|
|
client_out = Path("client") / "audio" / filename
|
|
public_out = Path("audio") / filename
|
|
client_out.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# generate TTS (blocking). Expected to write client_out
|
|
try:
|
|
sovits_gen_emotion(
|
|
tts_read_text, emotion=emotion, output_wav_pth=str(client_out)
|
|
)
|
|
except TypeError:
|
|
# fallback if your function signature is sovits_gen_emotion(text, emotion, output_path)
|
|
sovits_gen_emotion(tts_read_text, emotion, str(client_out))
|
|
|
|
# copy to public path expected by VRM bridge
|
|
copy_to_public(client_out, public_out)
|
|
|
|
# determine duration
|
|
try:
|
|
duration = get_wav_duration(public_out)
|
|
except Exception:
|
|
duration = fallback_get_wav_duration(public_out)
|
|
|
|
# enqueue for sequential playback
|
|
#playback.enqueue(public_out, expression, chunk, duration)
|
|
|
|
# Create playback item and enqueue
|
|
playback_item = PlaybackItem(
|
|
item_type=ItemType.TEXT,
|
|
content=tts_read_text,
|
|
audio_path=public_out,
|
|
expression=expression,
|
|
duration=duration,
|
|
)
|
|
playback.enqueue(playback_item)
|
|
|
|
# 7) After streaming ends, append the full assistant message to history and save
|
|
final_text = full_assistant_text.strip()
|
|
print("[llm final]", final_text)
|
|
|
|
# append assistant to messages and save history
|
|
messages.append(
|
|
{
|
|
"role": "assistant",
|
|
"content": [{"type": "output_text", "text": final_text}],
|
|
}
|
|
)
|
|
save_history(messages)
|
|
|
|
# Optionally wait a short moment for queued audio to finish playing before next loop
|
|
# NOTE: playback worker will ensure queued audio plays sequentially; here we don't block.
|
|
time.sleep(0.5)
|
|
|
|
except KeyboardInterrupt:
|
|
print("Interrupted by user, stopping.")
|
|
playback.stop()
|
|
break
|
|
except Exception as e:
|
|
print("Error in main loop:", e)
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main_loop()
|