Ai_Assistant/server/_archive/main_chat_loop.py
2026-05-24 13:31:30 +02:00

343 lines
11 KiB
Python

import asyncio
import os
import random
import threading
import time
### transcribe audio
import uuid
from pathlib import Path
import soundfile as sf
from faster_whisper import WhisperModel
from process.asr_func.asr_auto_record import record_on_speech, transcribe_audio
from process.asr_func.asr_push_to_talk import record_and_transcribe
from process.asr_func.asr_text_clean import clean_asr_output
from process.asr_func.asr_transcribe_groq import transcribe_audio_groq
from process.extensions.emotion_class_func.emotion_classifier import (
get_emotion,
load_emotion_model,
map_emotion_to_expression,
)
from process.faiss_func.faiss_memory import MemoryManager
from process.llm_funcs.llm_scr import llm_response, llm_response_with_memory
from process.mcp_funcs.mcp_llm_func import llm_response_mcp
from process.tts_func.sovits_ping import (
get_wav_duration,
play_audio,
sovits_gen,
sovits_gen_emotion,
)
from process.tts_func.tts_preprocess import clean_llm_output
from process.vision_func.gemini_vision import describe_image
from process.vrm_func.vrm_ping import vrm_animate, vrm_talk
from streaming_chat import animate_and_respond_streaming
def get_wav_duration(path):
with sf.SoundFile(path) as f:
return len(f) / f.samplerate
print(" \n ========= Starting Chat... ================ \n")
# whisper_model = WhisperModel("base.en", device="cuda", compute_type="float16")
# model_path = "/home/rayenfeng/riko_project_v1/server/models/model_quantized.onnx"
# emotion_model, tokenizer = load_emotion_model(model_path)
# memory = MemoryManager()
def animate_and_respond(user_message, click_interact=False, mode="mcp"):
"""
Main function to play animations and responds,
Mode: default: respond with no tool calling and no memory
Memory: (TO BE IMPLMENTED DO NOT USE) calls with long term memory
mcp: Model will connect to an available MCP server and respond (dynamic imports is not supported yet)
"""
if click_interact == False:
### play a thinking animation
animationpath = Path("animations/mixamo") / "Thinking.fbx"
# animationpath = Path("animations/mixamo_fbx") / "Angry.wav"
vrm_animate("start_mixamo", str(animationpath))
# user_spoken_text = clean_asr_output(user_message) # depreciated because of UI maybe renable for calling mode?
user_spoken_text = user_message
print("\n User : ", user_spoken_text)
### pass to LLM and get a LLM output.
if mode == "mcp":
# Delegate entirely to the streaming pipeline (handles TTS, playback, idle)
animate_and_respond_streaming(user_message, click_interact)
return
if mode == "Memory":
context = "GET CONTEXT FROM FAISS HERE"
llm_output = llm_response_with_memory(user_spoken_text, context)
elif mode == "default":
llm_output = llm_response(user_spoken_text)
print("Riko : \n", llm_output)
tts_read_text = clean_llm_output(llm_output)
### calculate emotion
# emotion = get_emotion(llm_output, emotion_model, tokenizer)
# expression = map_emotion_to_expression(emotion)
# print("Expression : \n", expression)
# currently under development! comment out once stable
expression = "relaxed"
### file organization
# 1. Generate a unique filename
uid = uuid.uuid4().hex
filename = f"output_{uid}.wav"
base_dir = Path(__file__).resolve().parent.parent # ~/riko_project_v1
output_wav_path = base_dir / "client" / "audio" / filename
public_audio_path = Path("audio") / filename
output_wav_path.parent.mkdir(parents=True, exist_ok=True)
### generate audio and save it to client/audio
# gen_aud_path = sovits_gen(tts_read_text,output_wav_path)
gen_aud_path = sovits_gen_emotion(tts_read_text, expression, output_wav_path)
print(f"Saving TTS to: ", gen_aud_path)
# Example
duration = get_wav_duration(output_wav_path)
vrm_talk(str(public_audio_path), expression, llm_output, int(duration))
## talking animmation
if click_interact == False:
animationpath = Path("animations/mixamo") / "Talking.fbx"
# animationpath = Path("animations/mixamo_fbx") / "Angry.wav"
vrm_animate("start_mixamo", str(animationpath))
print("waiting for audio to finish...")
time.sleep(duration)
animationpath = Path("animations/mixamo") / "Idle.fbx"
# animationpath = Path("animations/mixamo_fbx") / "Angry.wav"
vrm_animate("start_mixamo", str(animationpath))
def transcribe_and_respond(audio_file_path):
audio_file_path = str(audio_file_path)
# user_message = transcribe_audio(whisper_model, aud_path=audio_file_path)
user_message = transcribe_audio_groq(aud_path=audio_file_path)
animate_and_respond(user_message)
async def transcribe_audio_wrapper(audio_file_path):
audio_file_path = str(audio_file_path)
# user_message = transcribe_audio(whisper_model, aud_path=audio_file_path)
user_message = transcribe_audio_groq(aud_path=audio_file_path)
return user_message
def describe_image_and_respond(image_file_path, additional_text=None):
# describe image
image_message = describe_image(image_file_path)
# if has text input
if additional_text:
print(f"Processing image with additional context: {additional_text}")
user_message = f"{additional_text} \n [Image description: \n {image_message}]"
animate_and_respond(user_message)
else:
user_message = f"[Image description: \n {image_message}]"
animate_and_respond(user_message)
# quick helper to schedule a return to idle
def schedule_idle(delay: float = 2.0):
threading.Timer(
delay,
lambda: vrm_animate(
"start_mixamo", str(Path("animations/mixamo") / "Idle.fbx")
),
).start()
def handle_click_interaction(payload: dict):
region = payload.get("region", "").lower()
bone = payload.get("bone", "")
print("🎯 Playing animation for", region, "bone:", bone)
# --- 1. Immediate feedback: random talk sound ---
feedback_sounds = [
("oh.wav", "oh?"),
("hey.wav", "hey"),
]
sound_file, text = random.choice(feedback_sounds)
soundpath = Path("public/sounds") / sound_file
vrm_talk(str(soundpath), "relaxed", text, 1)
# --- 2. Pick animation based on region/bone ---
if region in {"chest", "bust", "belly"}:
# Touch torso
anim = Path("animations/vrma_xr") / "woah.vrma"
vrm_animate(
"start_vrma", str(anim), play_once=False, crop_start=0.85, crop_end=0.0
)
schedule_idle(3.0)
elif bone in {"left_cat_ear", "right_cat_ear"}:
# Touch ears
anim = Path("animations/vrma_xr") / "touch_ears.vrma"
vrm_animate(
"start_vrma", str(anim), play_once=False, crop_start=0.95, crop_end=0.0
)
schedule_idle(3.0)
elif any(
part in region
for part in [
"right_hand",
"right_arm",
"right_shoulder",
"right_thigh",
"right_shin",
"right_foot",
]
):
# Touch right side
anim = Path("animations/vrma_xr") / "lookright.vrma"
vrm_animate(
"start_vrma", str(anim), play_once=False, crop_start=0.72, crop_end=0.0
)
schedule_idle(2.2)
elif any(
part in region
for part in [
"left_hand",
"left_arm",
"left_shoulder",
"left_thigh",
"left_shin",
"left_foot",
]
):
# Touch left side
anim = Path("animations/vrma_xr") / "lookleft.vrma"
vrm_animate(
"start_vrma", str(anim), play_once=False, crop_start=0.82, crop_end=0.0
)
schedule_idle(2.1)
elif region in {"head", "neck", "hair"}:
# Touch head/hair
anim = Path("animations/vrma_xr") / "headpat_cover.vrma"
vrm_animate(
"start_vrma", str(anim), play_once=False, crop_start=0.72, crop_end=0.0
)
schedule_idle(2.8)
else:
# Default playful reaction
anim = Path("animations/vrma_xr") / "stop_it.vrma"
vrm_animate(
"start_vrma", str(anim), play_once=False, crop_start=0.82, crop_end=0.0
)
schedule_idle(1.7)
print(" \n ========= Finsihed Starting Chat... ================ \n")
if __name__ == "__main__":
while True:
## play an idle animation
animationpath = Path("animations/mixamo") / "Idle.fbx"
# animationpath = Path("animations/mixamo_fbx") / "Angry.wav"
vrm_animate("start_mixamo", str(animationpath))
conversation_recording = output_wav_path = Path("audio") / "conversation.wav"
conversation_recording.parent.mkdir(parents=True, exist_ok=True)
record_on_speech(
output_file=conversation_recording,
samplerate=44100,
channels=1,
silence_threshold=0.02, # Adjust based on your microphone sensitivity
silence_duration=1, # Stop after 3 seconds of silence
device=None, # Use default device, or specify by ID or name
)
### play a thinking animation
animationpath = Path("animations/mixamo") / "Thinking.fbx"
# animationpath = Path("animations/mixamo_fbx") / "Angry.wav"
vrm_animate("start_mixamo", str(animationpath))
# do functions
user_spoken_text = transcribe_audio(
whisper_model, aud_path=conversation_recording
)
#### use push to talk.
# user_spoken_text = record_and_transcribe(whisper_model, conversation_recording)
user_spoken_text = clean_asr_output(user_spoken_text)
print("\n User : ", user_spoken_text)
# search for relative memories
# context = memory.get_context_block(user_spoken_text, top_k= 2)
# print("Memories : \n", context)
### pass to LLM and get a LLM output.
llm_output = llm_response(user_spoken_text)
# llm_output = llm_response_with_memory(user_spoken_text,context)
print("Riko : \n", llm_output)
tts_read_text = clean_llm_output(llm_output)
### calculate emotion
emotion = get_emotion(llm_output, emotion_model, tokenizer)
expression = map_emotion_to_expression(emotion)
print("Expression : \n", expression)
### file organization
# 1. Generate a unique filename
uid = uuid.uuid4().hex
filename = f"output_{uid}.wav"
output_wav_path = Path("client", "audio") / filename
public_audio_path = Path("audio") / filename
output_wav_path.parent.mkdir(parents=True, exist_ok=True)
### generate audio and save it to client/audio
# gen_aud_path = sovits_gen(tts_read_text,output_wav_path)
gen_aud_path = sovits_gen_emotion(tts_read_text, expression, output_wav_path)
# Example
duration = get_wav_duration(output_wav_path)
vrm_talk(str(public_audio_path), expression, llm_output, int(duration))
## talking animmation
animationpath = Path("animations/mixamo") / "Talking.fbx"
# animationpath = Path("animations/mixamo_fbx") / "Angry.wav"
vrm_animate("start_mixamo", str(animationpath))
print("waiting for audio to finish...")
time.sleep(duration)
# clean up audio files
# [fp.unlink() for fp in Path("audio").glob("*.wav") if fp.is_file()]