Ai_Assistant/server/_archive/main_chat_loop_STREAMINGV0-NOTOOLS.py

501 lines
16 KiB
Python
Raw Permalink Normal View History

2026-05-24 13:31:30 +02:00
import asyncio
import json
import os
import random
import shutil
import threading
import time
### transcribe audio
import uuid
from contextlib import suppress
from pathlib import Path
from queue import Queue
from threading import Event, Thread
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from queue import Queue
from threading import Event, Thread
from typing import Any, Dict, List, Optional
import soundfile as sf
import yaml
from dotenv import load_dotenv
from faster_whisper import WhisperModel
from openai import OpenAI
from process.asr_func.asr_auto_record import record_on_speech, transcribe_audio
from process.asr_func.asr_transcribe_groq import record_on_speech, transcribe_audio_groq
from process.llm_funcs.llm_scr import llm_response, llm_response_with_memory
from process.tts_func.sovits_ping import (
get_wav_duration,
play_audio,
sovits_gen,
sovits_gen_emotion,
)
from process.tts_func.tts_preprocess import clean_llm_output
from process.vrm_func.vrm_ping import vrm_animate, vrm_talk
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
def get_wav_duration(path):
with sf.SoundFile(path) as f:
return len(f) / f.samplerate
print(" \n ========= Starting Chat... ================ \n")
#!/usr/bin/env python3
"""
Streaming LLM -> chunked TTS -> queued playback script for Riko.
What it does:
- Records user speech (uses your record_on_speech)
- Transcribes (transcribe_audio)
- Streams LLM text (OpenAI Responses streaming)
- For each chunk: generate TTS via sovits_gen_emotion, copy to public audio dir, enqueue for playback
- Playback loop calls vrm_talk and vrm_animate and waits for the audio's duration to avoid overlap
- At the end of the stream, the full assistant text is appended to the JSON history file
Fill in or import the helper functions you already have in your project:
- record_on_speech(output_file, samplerate, channels, silence_threshold, silence_duration, device)
- transcribe_audio(whisper_model, aud_path)
- clean_asr_output(text)
- sovits_gen_emotion(in_text, emotion, output_wav_pth) -> returns path to generated wav (must write to output_wav_pth)
- vrm_talk(public_audio_path, expression, llm_output, duration)
- vrm_animate(cmd, path)
- clean_llm_output(text)
- get_emotion(text, emotion_model, tokenizer)
- map_emotion_to_expression(emotion)
- get_wav_duration(path)
Make sure char_config.yaml contains history_file and model keys (see your example config).
"""
import json
import os
import shutil
import time
import uuid
from contextlib import suppress
from pathlib import Path
from queue import Queue
from threading import Thread
import requests
import yaml
from openai import OpenAI
# ---------------------------
# Load config + OpenAI client
# ---------------------------
CONFIG_PATH = os.path.expanduser("~/riko_project_v1/character_config.yaml")
if not os.path.exists(CONFIG_PATH):
raise FileNotFoundError(f"Config not found at {CONFIG_PATH}")
with open(CONFIG_PATH, "r") as f:
char_config = yaml.safe_load(f)
HISTORY_FILE = char_config["history_file"]
MODEL = char_config.get("model", "gpt-4.1-mini")
# MODEL = 'gpt-4.1'
CASE_SYSTEM_PROMPT = "You are a helpful assistant." # override for testing.
SYSTEM_PROMPT = [
{
"role": "system",
"content": [
{
"type": "input_text",
"text": char_config["presets"]["default"]["system_prompt"],
}
# {"type": "input_text", "text": CASE_SYSTEM_PROMPT}
],
}
]
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
raise EnvironmentError("Please set OPENAI_API_KEY in your environment")
client = OpenAI(api_key=OPENAI_API_KEY)
# ---------------------------
# History utilities
# ---------------------------
def load_history():
if os.path.exists(HISTORY_FILE):
with open(HISTORY_FILE, "r") as f:
return json.load(f)
return SYSTEM_PROMPT.copy()
def save_history(history):
with open(HISTORY_FILE, "w") as f:
json.dump(history, f, indent=2)
# ---------------------------
# State Manager
# ---------------------------
BASE_URL = "http://localhost:8000"
def set_vrm_state(state):
"""
Set the VRM avatar's animation state.
Valid states:
- idle: Avatar looks around naturally, with chance to look at user
- listening: Avatar focuses on user with occasional side glances
- thinking: Avatar looks away with eyes leading head movement
- talking: Avatar nods with variable intensity and head tilts
"""
url = f"{BASE_URL}/set_state"
payload = {"state": state}
resp = requests.post(url, json=payload)
print(f"[set_state] Status: {resp.status_code}, State: {state}")
return resp
# ---------------------------
# Streaming helper
# ---------------------------
def stream_text_chunks(messages, min_len=30, max_len=120):
"""Synchronous generator that yields text chunks as the model streams.
Uses the OpenAI Responses streaming client like your example.
"""
buffer = ""
with client.responses.stream(
model=MODEL,
input=messages,
temperature=1,
top_p=1,
max_output_tokens=2048,
) as stream:
for event in stream:
# event.type values: "response.output_text.delta", "response.output_text.done", ...
if event.type == "response.output_text.delta":
buffer += event.delta
# Rule 1: yield if buffer ends with punctuation AND is long enough
if buffer.endswith((".", "?", "!", "")) and len(buffer) >= min_len:
yield buffer.strip()
buffer = ""
# Rule 2: force yield if buffer gets too long
elif len(buffer) >= max_len:
yield buffer.strip()
buffer = ""
elif event.type == "response.output_text.done":
# Final flush
if buffer.strip():
yield buffer.strip()
# After final flush we'll exit the context; final response object is available
final_response = stream.get_final_response()
# Optionally return final_response (can't return from generator).
# ---------------------------
# Playback worker (single-threaded sequential playback)
# ---------------------------
# ==================== Item Types ====================
class ItemType(Enum):
TEXT = "text"
FUNCTION_CALL = "function_call"
@dataclass
class PlaybackItem:
"""Item to be processed in the playback queue."""
item_type: ItemType
content: str # Text content or function name
arguments: Optional[Dict] = None # Function arguments
audio_path: Optional[Path] = None # Path to audio file for text items
expression: str = "relaxed"
duration: float = 0.0
needs_sync: bool = False # Whether function needs to be synced with TTS
# ==================== Playback Worker ====================
class OrderedPlaybackWorker:
"""
Worker that processes text and function calls in strict order.
Ensures audio doesn't overlap and functions execute at the right time.
"""
def __init__(self):
self.queue = Queue()
self.thread = Thread(target=self._run, daemon=True)
self._running = False
self._talking = False
self.queue_finished_event = Event() # NEW: Event to signal queue is empty
self.queue_finished_event.set() # Start as finished (no items)
def start(self):
if not self._running:
self._running = True
self.thread.start()
def enqueue(self, item: PlaybackItem):
"""Add an item to the playback queue."""
self.queue_finished_event.clear() # NEW: Mark queue as not finished
self.queue.put(item)
def wait_until_finished(self, timeout=None):
"""
Wait until the playback queue is completely empty and processed.
Args:
timeout: Maximum time to wait in seconds (None = wait forever)
Returns:
True if queue finished, False if timeout occurred
"""
return self.queue_finished_event.wait(timeout)
def _run(self):
"""Main playback loop - processes items in strict order."""
while True:
item = self.queue.get()
if item is None:
break
if item.item_type == ItemType.TEXT:
self._process_text_item(item)
elif item.item_type == ItemType.FUNCTION_CALL:
self._process_function_item(item)
# Return to idle if queue is empty
if self.queue.empty():
try:
idle_path = Path("animations/mixamo") / "Idle.fbx"
vrm_animate("start_mixamo", str(idle_path))
self._talking = False
except Exception as e:
print(f"vrm_animate (idle) failed: {e}")
# NEW: Signal that queue is finished
self.queue_finished_event.set()
def _process_text_item(self, item: PlaybackItem):
"""Process a text item - play TTS audio."""
try:
# Start talking animation if not already talking
if not self._talking:
anim_path = Path("animations/mixamo") / "Talking.fbx"
vrm_animate("start_mixamo", str(anim_path))
self._talking = True
except Exception as e:
print(f"vrm_animate (start talking) failed: {e}")
# Send audio to VRM
try:
vrm_talk(
str(item.audio_path), item.expression, item.content, int(item.duration)
)
except Exception as e:
print(f"vrm_talk failed: {e}")
# Wait for audio duration to prevent overlap
time.sleep(item.duration)
def stop(self):
self.queue.put(None)
self.thread.join()
# ---------------------------
# Utilities
# ---------------------------
def ensure_dirs():
Path("client/audio").mkdir(parents=True, exist_ok=True)
Path("audio").mkdir(parents=True, exist_ok=True)
def copy_to_public(client_path: Path, public_path: Path):
# Copy the client audio file to the public folder that vrm_talk expects
shutil.copy2(client_path, public_path)
# Fallback duration reader (if you don't have one)
def fallback_get_wav_duration(p: Path):
try:
import contextlib
import wave
with contextlib.closing(wave.open(str(p), "r")) as wf:
frames = wf.getnframes()
rate = wf.getframerate()
return frames / float(rate)
except Exception:
return 3.0
# ---------------------------
# Main orchestration
# ---------------------------
def main_loop():
ensure_dirs()
playback = OrderedPlaybackWorker()
playback.start()
# Load any models or tokenizers you have for emotion detection here
# whisper_model, emotion_model, tokenizer = load_your_models()
while True:
try:
# NEW: Wait for playback queue to finish before starting microphone
print("\n⏳ Waiting for playback queue to finish...")
playback.wait_until_finished()
print("✅ Queue finished, ready for input")
# 1) Idle animation
try:
idle_anim = Path("animations/mixamo") / "Idle.fbx"
vrm_animate("start_mixamo", str(idle_anim))
set_vrm_state("idle")
except Exception:
pass
# cont = input(">>> ENTER TO CONTINUE")
conversation_recording = Path("audio") / "conversation.wav"
conversation_recording.parent.mkdir(parents=True, exist_ok=True)
conversation_recording = str(conversation_recording)
record_on_speech(
output_file=conversation_recording,
samplerate=44100,
channels=1,
silence_threshold=0.02, # Adjust based on your microphone sensitivity
silence_duration=2, # Stop after 3 seconds of silence
device=None, # Use default device, or specify by ID or name
)
# 3) Thinking animation
try:
thinking_anim = Path("animations/mixamo") / "Thinking.fbx"
set_vrm_state("thinking")
vrm_animate("start_mixamo", str(thinking_anim))
except Exception:
pass
# 4) Transcribe
user_spoken_text = transcribe_audio_groq(aud_path=conversation_recording)
# 5) Build messages history
messages = load_history()
messages.append(
{
"role": "user",
"content": [{"type": "input_text", "text": user_spoken_text}],
}
)
# 6) Stream model and generate TTS per chunk
print("[llm] streaming response...")
full_assistant_text = ""
set_vrm_state("talking")
for chunk in stream_text_chunks(messages):
print("[chunk]", chunk)
# accumulate final text
full_assistant_text += chunk + " "
# prepare TTS text and emotion
tts_read_text = clean_llm_output(chunk)
# emotion = get_emotion(chunk, None, None) # plug your emotion model/tokenizer
# expression = map_emotion_to_expression(emotion)
# temp implementation
emotion = "relaxed" # or "smug" etc.
expression = "relaxed"
# create unique filename for this chunk
uid = uuid.uuid4().hex
filename = f"output_{uid}.wav"
client_out = Path("client") / "audio" / filename
public_out = Path("audio") / filename
client_out.parent.mkdir(parents=True, exist_ok=True)
# generate TTS (blocking). Expected to write client_out
try:
sovits_gen_emotion(
tts_read_text, emotion=emotion, output_wav_pth=str(client_out)
)
except TypeError:
# fallback if your function signature is sovits_gen_emotion(text, emotion, output_path)
sovits_gen_emotion(tts_read_text, emotion, str(client_out))
# copy to public path expected by VRM bridge
copy_to_public(client_out, public_out)
# determine duration
try:
duration = get_wav_duration(public_out)
except Exception:
duration = fallback_get_wav_duration(public_out)
# enqueue for sequential playback
#playback.enqueue(public_out, expression, chunk, duration)
# Create playback item and enqueue
playback_item = PlaybackItem(
item_type=ItemType.TEXT,
content=tts_read_text,
audio_path=public_out,
expression=expression,
duration=duration,
)
playback.enqueue(playback_item)
# 7) After streaming ends, append the full assistant message to history and save
final_text = full_assistant_text.strip()
print("[llm final]", final_text)
# append assistant to messages and save history
messages.append(
{
"role": "assistant",
"content": [{"type": "output_text", "text": final_text}],
}
)
save_history(messages)
# Optionally wait a short moment for queued audio to finish playing before next loop
# NOTE: playback worker will ensure queued audio plays sequentially; here we don't block.
time.sleep(0.5)
except KeyboardInterrupt:
print("Interrupted by user, stopping.")
playback.stop()
break
except Exception as e:
print("Error in main loop:", e)
time.sleep(1)
if __name__ == "__main__":
main_loop()