343 lines
11 KiB
Python
343 lines
11 KiB
Python
import asyncio
|
|
import os
|
|
import random
|
|
import threading
|
|
import time
|
|
|
|
### transcribe audio
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
import soundfile as sf
|
|
from faster_whisper import WhisperModel
|
|
from process.asr_func.asr_auto_record import record_on_speech, transcribe_audio
|
|
from process.asr_func.asr_push_to_talk import record_and_transcribe
|
|
from process.asr_func.asr_text_clean import clean_asr_output
|
|
from process.asr_func.asr_transcribe_groq import transcribe_audio_groq
|
|
from process.extensions.emotion_class_func.emotion_classifier import (
|
|
get_emotion,
|
|
load_emotion_model,
|
|
map_emotion_to_expression,
|
|
)
|
|
from process.faiss_func.faiss_memory import MemoryManager
|
|
from process.llm_funcs.llm_scr import llm_response, llm_response_with_memory
|
|
from process.mcp_funcs.mcp_llm_func import llm_response_mcp
|
|
from process.tts_func.sovits_ping import (
|
|
get_wav_duration,
|
|
play_audio,
|
|
sovits_gen,
|
|
sovits_gen_emotion,
|
|
)
|
|
from process.tts_func.tts_preprocess import clean_llm_output
|
|
from process.vision_func.gemini_vision import describe_image
|
|
from process.vrm_func.vrm_ping import vrm_animate, vrm_talk
|
|
from streaming_chat import animate_and_respond_streaming
|
|
|
|
|
|
def get_wav_duration(path):
|
|
with sf.SoundFile(path) as f:
|
|
return len(f) / f.samplerate
|
|
|
|
|
|
print(" \n ========= Starting Chat... ================ \n")
|
|
# whisper_model = WhisperModel("base.en", device="cuda", compute_type="float16")
|
|
|
|
|
|
# model_path = "/home/rayenfeng/riko_project_v1/server/models/model_quantized.onnx"
|
|
# emotion_model, tokenizer = load_emotion_model(model_path)
|
|
|
|
# memory = MemoryManager()
|
|
|
|
|
|
def animate_and_respond(user_message, click_interact=False, mode="mcp"):
|
|
"""
|
|
Main function to play animations and responds,
|
|
Mode: default: respond with no tool calling and no memory
|
|
Memory: (TO BE IMPLMENTED DO NOT USE) calls with long term memory
|
|
mcp: Model will connect to an available MCP server and respond (dynamic imports is not supported yet)
|
|
|
|
"""
|
|
|
|
if click_interact == False:
|
|
### play a thinking animation
|
|
animationpath = Path("animations/mixamo") / "Thinking.fbx"
|
|
# animationpath = Path("animations/mixamo_fbx") / "Angry.wav"
|
|
vrm_animate("start_mixamo", str(animationpath))
|
|
|
|
# user_spoken_text = clean_asr_output(user_message) # depreciated because of UI maybe renable for calling mode?
|
|
user_spoken_text = user_message
|
|
|
|
print("\n User : ", user_spoken_text)
|
|
|
|
### pass to LLM and get a LLM output.
|
|
|
|
if mode == "mcp":
|
|
# Delegate entirely to the streaming pipeline (handles TTS, playback, idle)
|
|
animate_and_respond_streaming(user_message, click_interact)
|
|
return
|
|
|
|
if mode == "Memory":
|
|
context = "GET CONTEXT FROM FAISS HERE"
|
|
llm_output = llm_response_with_memory(user_spoken_text, context)
|
|
elif mode == "default":
|
|
llm_output = llm_response(user_spoken_text)
|
|
|
|
print("Riko : \n", llm_output)
|
|
|
|
tts_read_text = clean_llm_output(llm_output)
|
|
|
|
### calculate emotion
|
|
|
|
# emotion = get_emotion(llm_output, emotion_model, tokenizer)
|
|
# expression = map_emotion_to_expression(emotion)
|
|
|
|
# print("Expression : \n", expression)
|
|
|
|
# currently under development! comment out once stable
|
|
expression = "relaxed"
|
|
|
|
### file organization
|
|
|
|
# 1. Generate a unique filename
|
|
uid = uuid.uuid4().hex
|
|
filename = f"output_{uid}.wav"
|
|
base_dir = Path(__file__).resolve().parent.parent # ~/riko_project_v1
|
|
output_wav_path = base_dir / "client" / "audio" / filename
|
|
public_audio_path = Path("audio") / filename
|
|
output_wav_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
### generate audio and save it to client/audio
|
|
# gen_aud_path = sovits_gen(tts_read_text,output_wav_path)
|
|
gen_aud_path = sovits_gen_emotion(tts_read_text, expression, output_wav_path)
|
|
|
|
print(f"Saving TTS to: ", gen_aud_path)
|
|
|
|
# Example
|
|
duration = get_wav_duration(output_wav_path)
|
|
|
|
vrm_talk(str(public_audio_path), expression, llm_output, int(duration))
|
|
|
|
## talking animmation
|
|
if click_interact == False:
|
|
animationpath = Path("animations/mixamo") / "Talking.fbx"
|
|
# animationpath = Path("animations/mixamo_fbx") / "Angry.wav"
|
|
vrm_animate("start_mixamo", str(animationpath))
|
|
|
|
print("waiting for audio to finish...")
|
|
time.sleep(duration)
|
|
|
|
animationpath = Path("animations/mixamo") / "Idle.fbx"
|
|
# animationpath = Path("animations/mixamo_fbx") / "Angry.wav"
|
|
vrm_animate("start_mixamo", str(animationpath))
|
|
|
|
|
|
def transcribe_and_respond(audio_file_path):
|
|
audio_file_path = str(audio_file_path)
|
|
# user_message = transcribe_audio(whisper_model, aud_path=audio_file_path)
|
|
user_message = transcribe_audio_groq(aud_path=audio_file_path)
|
|
animate_and_respond(user_message)
|
|
|
|
|
|
async def transcribe_audio_wrapper(audio_file_path):
|
|
audio_file_path = str(audio_file_path)
|
|
# user_message = transcribe_audio(whisper_model, aud_path=audio_file_path)
|
|
user_message = transcribe_audio_groq(aud_path=audio_file_path)
|
|
return user_message
|
|
|
|
|
|
def describe_image_and_respond(image_file_path, additional_text=None):
|
|
# describe image
|
|
image_message = describe_image(image_file_path)
|
|
# if has text input
|
|
|
|
if additional_text:
|
|
print(f"Processing image with additional context: {additional_text}")
|
|
user_message = f"{additional_text} \n [Image description: \n {image_message}]"
|
|
animate_and_respond(user_message)
|
|
else:
|
|
user_message = f"[Image description: \n {image_message}]"
|
|
animate_and_respond(user_message)
|
|
|
|
|
|
# quick helper to schedule a return to idle
|
|
def schedule_idle(delay: float = 2.0):
|
|
threading.Timer(
|
|
delay,
|
|
lambda: vrm_animate(
|
|
"start_mixamo", str(Path("animations/mixamo") / "Idle.fbx")
|
|
),
|
|
).start()
|
|
|
|
|
|
def handle_click_interaction(payload: dict):
|
|
region = payload.get("region", "").lower()
|
|
bone = payload.get("bone", "")
|
|
|
|
print("🎯 Playing animation for", region, "bone:", bone)
|
|
|
|
# --- 1. Immediate feedback: random talk sound ---
|
|
feedback_sounds = [
|
|
("oh.wav", "oh?"),
|
|
("hey.wav", "hey"),
|
|
]
|
|
sound_file, text = random.choice(feedback_sounds)
|
|
soundpath = Path("public/sounds") / sound_file
|
|
vrm_talk(str(soundpath), "relaxed", text, 1)
|
|
|
|
# --- 2. Pick animation based on region/bone ---
|
|
if region in {"chest", "bust", "belly"}:
|
|
# Touch torso
|
|
anim = Path("animations/vrma_xr") / "woah.vrma"
|
|
vrm_animate(
|
|
"start_vrma", str(anim), play_once=False, crop_start=0.85, crop_end=0.0
|
|
)
|
|
schedule_idle(3.0)
|
|
|
|
elif bone in {"left_cat_ear", "right_cat_ear"}:
|
|
# Touch ears
|
|
anim = Path("animations/vrma_xr") / "touch_ears.vrma"
|
|
vrm_animate(
|
|
"start_vrma", str(anim), play_once=False, crop_start=0.95, crop_end=0.0
|
|
)
|
|
schedule_idle(3.0)
|
|
|
|
elif any(
|
|
part in region
|
|
for part in [
|
|
"right_hand",
|
|
"right_arm",
|
|
"right_shoulder",
|
|
"right_thigh",
|
|
"right_shin",
|
|
"right_foot",
|
|
]
|
|
):
|
|
# Touch right side
|
|
anim = Path("animations/vrma_xr") / "lookright.vrma"
|
|
vrm_animate(
|
|
"start_vrma", str(anim), play_once=False, crop_start=0.72, crop_end=0.0
|
|
)
|
|
schedule_idle(2.2)
|
|
|
|
elif any(
|
|
part in region
|
|
for part in [
|
|
"left_hand",
|
|
"left_arm",
|
|
"left_shoulder",
|
|
"left_thigh",
|
|
"left_shin",
|
|
"left_foot",
|
|
]
|
|
):
|
|
# Touch left side
|
|
anim = Path("animations/vrma_xr") / "lookleft.vrma"
|
|
vrm_animate(
|
|
"start_vrma", str(anim), play_once=False, crop_start=0.82, crop_end=0.0
|
|
)
|
|
schedule_idle(2.1)
|
|
|
|
elif region in {"head", "neck", "hair"}:
|
|
# Touch head/hair
|
|
anim = Path("animations/vrma_xr") / "headpat_cover.vrma"
|
|
vrm_animate(
|
|
"start_vrma", str(anim), play_once=False, crop_start=0.72, crop_end=0.0
|
|
)
|
|
schedule_idle(2.8)
|
|
|
|
else:
|
|
# Default playful reaction
|
|
anim = Path("animations/vrma_xr") / "stop_it.vrma"
|
|
vrm_animate(
|
|
"start_vrma", str(anim), play_once=False, crop_start=0.82, crop_end=0.0
|
|
)
|
|
schedule_idle(1.7)
|
|
|
|
|
|
print(" \n ========= Finsihed Starting Chat... ================ \n")
|
|
|
|
if __name__ == "__main__":
|
|
while True:
|
|
## play an idle animation
|
|
animationpath = Path("animations/mixamo") / "Idle.fbx"
|
|
# animationpath = Path("animations/mixamo_fbx") / "Angry.wav"
|
|
vrm_animate("start_mixamo", str(animationpath))
|
|
|
|
conversation_recording = output_wav_path = Path("audio") / "conversation.wav"
|
|
conversation_recording.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
record_on_speech(
|
|
output_file=conversation_recording,
|
|
samplerate=44100,
|
|
channels=1,
|
|
silence_threshold=0.02, # Adjust based on your microphone sensitivity
|
|
silence_duration=1, # Stop after 3 seconds of silence
|
|
device=None, # Use default device, or specify by ID or name
|
|
)
|
|
|
|
### play a thinking animation
|
|
animationpath = Path("animations/mixamo") / "Thinking.fbx"
|
|
# animationpath = Path("animations/mixamo_fbx") / "Angry.wav"
|
|
vrm_animate("start_mixamo", str(animationpath))
|
|
|
|
# do functions
|
|
user_spoken_text = transcribe_audio(
|
|
whisper_model, aud_path=conversation_recording
|
|
)
|
|
#### use push to talk.
|
|
# user_spoken_text = record_and_transcribe(whisper_model, conversation_recording)
|
|
|
|
user_spoken_text = clean_asr_output(user_spoken_text)
|
|
|
|
print("\n User : ", user_spoken_text)
|
|
|
|
# search for relative memories
|
|
# context = memory.get_context_block(user_spoken_text, top_k= 2)
|
|
|
|
# print("Memories : \n", context)
|
|
|
|
### pass to LLM and get a LLM output.
|
|
|
|
llm_output = llm_response(user_spoken_text)
|
|
# llm_output = llm_response_with_memory(user_spoken_text,context)
|
|
|
|
print("Riko : \n", llm_output)
|
|
|
|
tts_read_text = clean_llm_output(llm_output)
|
|
|
|
### calculate emotion
|
|
|
|
emotion = get_emotion(llm_output, emotion_model, tokenizer)
|
|
expression = map_emotion_to_expression(emotion)
|
|
|
|
print("Expression : \n", expression)
|
|
|
|
### file organization
|
|
|
|
# 1. Generate a unique filename
|
|
uid = uuid.uuid4().hex
|
|
filename = f"output_{uid}.wav"
|
|
output_wav_path = Path("client", "audio") / filename
|
|
public_audio_path = Path("audio") / filename
|
|
output_wav_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
### generate audio and save it to client/audio
|
|
# gen_aud_path = sovits_gen(tts_read_text,output_wav_path)
|
|
gen_aud_path = sovits_gen_emotion(tts_read_text, expression, output_wav_path)
|
|
|
|
# Example
|
|
duration = get_wav_duration(output_wav_path)
|
|
|
|
vrm_talk(str(public_audio_path), expression, llm_output, int(duration))
|
|
|
|
## talking animmation
|
|
animationpath = Path("animations/mixamo") / "Talking.fbx"
|
|
# animationpath = Path("animations/mixamo_fbx") / "Angry.wav"
|
|
vrm_animate("start_mixamo", str(animationpath))
|
|
|
|
print("waiting for audio to finish...")
|
|
time.sleep(duration)
|
|
|
|
# clean up audio files
|
|
# [fp.unlink() for fp in Path("audio").glob("*.wav") if fp.is_file()]
|