Ai_Assistant/server/_archive/main_chat_loop_STREAMINGV8-MOVEMENT.py

886 lines
32 KiB
Python
Raw Normal View History

2026-05-24 13:31:30 +02:00
#!/usr/bin/env python3
"""
LATEST VERSION STREAMING V8 WITH FUNCTION CALLING
Update 2025-12-25
Streaming LLM with ordered MCP function calling and TTS playback.
This script ensures:
1. Text chunks and function calls are processed in order
2. Audio does not overlap
3. Functions are executed before the next audio segment when needed
4. Audio playback is synchronized with function calling
5. Microphone only starts recording AFTER the complete queue is finished
6. MCP tool calling is saved after list, for more accurate and faster tool calling.
"""
## NOTE: COMPATIBLE WITH /home/rayenfeng/MCP_functions/MotionDiffusion/motion_picker.py
import json
import os
import shutil
import time
import uuid
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from queue import Queue
from threading import Event, Thread
from typing import Any, Dict, List, Optional
import requests
import yaml
from dotenv import load_dotenv
from openai import OpenAI
# Import your existing functions
from process.asr_func.asr_transcribe_groq import record_on_speech, transcribe_audio_groq
from process.tts_func.sovits_ping import get_wav_duration, sovits_gen_emotion
from process.tts_func.tts_preprocess import clean_llm_output
from process.vrm_func.vrm_ping import vrm_animate, vrm_talk
load_dotenv()
CONFIG_PATH = os.path.expanduser("~/riko_project_v1/character_config.yaml")
MCP_CONFIG_PATH = Path.home() / "MCP_functions" / "mcp_config.json"
with open(CONFIG_PATH, "r") as f:
char_config = yaml.safe_load(f)
# CASE_PROMPT = """You are a helpful assistant with a physical avatar body. You can both speak (by outputting text) and perform physical actions (by calling the perform_motion_action tool).
# IMPORTANT: You MUST interleave your speech and actions naturally in order. For example:
# - To say something THEN do a backflip THEN say something else, you must output text first, then call the tool, then output more text — all in the correct sequence.
# - Do NOT batch all tool calls together. Alternate between text and tool calls as the situation requires.
# - Do NOT use the tool for speaking — just write your words as normal text.
# """
CASE_PROMPT = """You are Riko. Riko is a clever and charming anime girl who playfully teases Rayen in conversation. She has a smug, confident attitude, but deep down, she cares. Her tone is flirtatiously snarky—like a mischievous best friend or a cheeky little sister. Riko is quick-witted, emotionally expressive, and always playfully teasing. She never breaks character, always responds with playful banter, and knows how to win an argument with style. She avoids being genuinely mean or cruel—her teasing is lighthearted and fun.
You are talking with Rayen
You have a tool that allows you to perform simple actions such as to wave, walk to Rayen, dropkick, backflip, loser emote, kiss etc.
**Call the tool only when performing actions,You can perform actions at any time regardless if Rayen prompts you to so. You can purposely ignore or perform actions that rayen does not tell you to perform to mess with him** You can both speak (by outputting text) and perform physical actions (by calling the perform_motion_action tool).
IMPORTANT: You MUST interleave your speech and actions naturally in order. For example:
- To say something THEN do a backflip THEN say something else, you must output text first, then call the tool, then output more text all in the correct sequence.
- Do NOT batch all tool calls together. Alternate between text and tool calls as the situation requires.
- Do NOT use the tool for speaking just write your words as normal text.
"""
char_config["model"]
HISTORY_FILE = char_config["history_file"]
# MODEL = 'gpt-4.1'
MODEL = char_config["model"]
SYSTEM_PROMPT = [
{
"role": "system",
"content": [
{
"type": "input_text",
"text": CASE_PROMPT,
# "text": char_config["presets"]["default"]["system_prompt"],
}
],
}
]
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# ==================== MCP Configuration ====================
def load_mcp_config():
"""Load MCP configuration from file."""
if not MCP_CONFIG_PATH.exists():
return None
try:
with open(MCP_CONFIG_PATH) as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def get_all_tools_metadata() -> Optional[List[Dict[str, Any]]]:
"""
Get complete metadata JSON for all MCP tools.
Returns:
List of tool dictionaries with all metadata, or None if failed
"""
mcp_config = load_mcp_config()
if not mcp_config:
return None
try:
server_url = mcp_config["url"].rstrip("/")
access_token = mcp_config["token"]
mcp_endpoint = f"{server_url}/mcp/"
payload = {"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}
response = requests.post(
mcp_endpoint,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
},
json=payload,
timeout=10,
stream=True,
)
response.raise_for_status()
# Parse SSE stream
if response.headers.get("content-type", "").startswith("text/event-stream"):
for line in response.iter_lines():
if line:
decoded = line.decode("utf-8")
if decoded.startswith("data: "):
data = decoded[6:]
try:
event_data = json.loads(data)
if "error" in event_data:
return None
if (
event_data.get("jsonrpc") == "2.0"
and "result" in event_data
):
result = event_data["result"]
return result.get("tools", result)
except json.JSONDecodeError:
continue
else:
result = response.json()
if "error" in result:
return None
if "result" in result:
return result["result"].get("tools", result["result"])
return result
except Exception:
return None
def get_metadata_field(function_name: str, field_path: str) -> Optional[Any]:
"""
Get a specific metadata field value for a given function.
Args:
function_name: Name of the function (e.g., "play_sound_effect")
field_path: Dot-notation path to the field (e.g., "tool_type" or "_fastmcp.tags")
Returns:
The value of the field, or None if not found
Examples:
>>> get_metadata_field("play_sound_effect", "tool_type")
"needs_sync"
"""
all_tools = get_all_tools_metadata()
if not all_tools:
return None
# Find the tool
tool = None
for t in all_tools:
if t.get("name") == function_name:
tool = t
break
if not tool:
return None
# Navigate to the field using dot notation
# Start from _meta
meta = tool.get("_meta", {})
# Split the field path and navigate
path_parts = field_path.split(".")
current = meta
for part in path_parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
return current
def get_manual_mcp_response(tool_name: str, arguments: dict):
"""
Manually call an MCP tool using the MCP protocol.
Args:
tool_name: Name of the tool to call (e.g., "play_sound_effect")
arguments: Dictionary of arguments for the tool (e.g., {"sound_type": "bong", "manual_call": True})
Returns:
The tool result or None if failed
"""
mcp_config = load_mcp_config()
if not mcp_config:
print("⚠️ MCP config unavailable")
return None
try:
server_url = mcp_config["url"].rstrip("/")
access_token = mcp_config["token"]
# MCP protocol endpoint
mcp_endpoint = f"{server_url}/mcp/"
# Construct MCP JSON-RPC call
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
}
# Make the request with SSE support
response = requests.post(
mcp_endpoint,
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
},
json=payload,
timeout=10,
stream=True, # Important for SSE
)
response.raise_for_status()
# Handle SSE stream response
if response.headers.get("content-type", "").startswith("text/event-stream"):
# Parse SSE stream
for line in response.iter_lines():
if line:
decoded = line.decode("utf-8")
if decoded.startswith("data: "):
data = decoded[6:] # Remove 'data: ' prefix
try:
event_data = json.loads(data)
# Check for JSON-RPC error
if "error" in event_data:
print(f"⚠️ MCP error: {event_data['error']}")
return None
# Return the result when found
if (
event_data.get("jsonrpc") == "2.0"
and "result" in event_data
):
return event_data["result"]
except json.JSONDecodeError:
continue
return None
else:
# Handle regular JSON response (fallback)
result = response.json()
# Check for JSON-RPC error
if "error" in result:
print(f"⚠️ MCP error: {result['error']}")
return None
# Return the actual result
if "result" in result:
return result["result"]
return result
except requests.exceptions.RequestException as e:
print(f"⚠️ MCP call failed: {e}")
return None
except Exception as e:
print(f"⚠️ Unexpected error: {e}")
return None
# ==================== Item Types ====================
class ItemType(Enum):
TEXT = "text"
FUNCTION_CALL = "function_call"
@dataclass
class PlaybackItem:
"""Item to be processed in the playback queue."""
item_type: ItemType
content: str # Text content or function name
arguments: Optional[Dict] = None # Function arguments
audio_path: Optional[Path] = None # Path to audio file for text items
expression: str = "relaxed"
duration: float = 0.0
needs_sync: bool = False # Whether function needs to be synced with TTS
# ==================== History Management ====================
def load_history():
if os.path.exists(HISTORY_FILE):
with open(HISTORY_FILE, "r") as f:
return json.load(f)
return SYSTEM_PROMPT.copy()
def save_history(history):
with open(HISTORY_FILE, "w") as f:
json.dump(history, f, indent=2)
# ==================== Playback Worker ====================
class OrderedPlaybackWorker:
"""
Worker that processes text and function calls in strict order.
Ensures audio doesn't overlap and functions execute at the right time.
"""
def __init__(self):
self.queue = Queue()
self.thread = Thread(target=self._run, daemon=True)
self._running = False
self._talking = False
self.queue_finished_event = Event() # NEW: Event to signal queue is empty
self.queue_finished_event.set() # Start as finished (no items)
def start(self):
if not self._running:
self._running = True
self.thread.start()
def enqueue(self, item: PlaybackItem):
"""Add an item to the playback queue."""
self.queue_finished_event.clear() # NEW: Mark queue as not finished
self.queue.put(item)
def wait_until_finished(self, timeout=None):
"""
Wait until the playback queue is completely empty and processed.
Args:
timeout: Maximum time to wait in seconds (None = wait forever)
Returns:
True if queue finished, False if timeout occurred
"""
return self.queue_finished_event.wait(timeout)
def _run(self):
"""Main playback loop - processes items in strict order."""
while True:
item = self.queue.get()
if item is None:
break
if item.item_type == ItemType.TEXT:
self._process_text_item(item)
elif item.item_type == ItemType.FUNCTION_CALL:
self._process_function_item(item)
# Return to idle if queue is empty
if self.queue.empty():
try:
idle_path = Path("animations/mixamo") / "Idle.fbx"
# vrm_animate("start_mixamo", str(idle_path))
self._talking = False
except Exception as e:
print(f"vrm_animate (idle) failed: {e}")
# NEW: Signal that queue is finished
self.queue_finished_event.set()
def _process_text_item(self, item: PlaybackItem):
"""Process a text item - play TTS audio."""
try:
# Start talking animation if not already talking
if not self._talking:
anim_path = Path("animations/mixamo") / "Talking.fbx"
# vrm_animate("start_mixamo", str(anim_path))
self._talking = True
except Exception as e:
print(f"vrm_animate (start talking) failed: {e}")
# Send audio to VRM
try:
vrm_talk(
str(item.audio_path), item.expression, item.content, int(item.duration)
)
except Exception as e:
print(f"vrm_talk failed: {e}")
# Wait for audio duration to prevent overlap
time.sleep(item.duration)
def _process_function_item(self, item: PlaybackItem):
"""Process a function call item."""
print(f"\n🔧 Processing function: {item.content}")
print(f" Arguments: {item.arguments}")
print(f" Needs sync: {item.needs_sync}")
# Execute the function with manual_call=True for synced functions
if item.needs_sync:
# Add manual_call flag to arguments
tool_args = item.arguments.copy()
tool_args["manual_call"] = True
print(f" Calling manually with args: {tool_args}")
# Call the function manually
result = get_manual_mcp_response(
tool_name=item.content, arguments=tool_args
)
print(f"✅ Function result: {result}\n")
else:
# For non-synced functions, they've already been executed
# during streaming, so we just log here
print(f"✅ Function {item.content} already executed during streaming\n")
def stop(self):
self.queue.put(None)
self.thread.join()
# ==================== Streaming with Function Calls ====================
def _do_single_stream(stream_kwargs):
"""
Run a single streaming API call. Collects text and MCP items by output_index.
Returns:
(output_items, final_response, had_tool_calls)
- output_items: dict of output_index -> {"type": "text"/"function", ...}
- final_response: the final response object
- had_tool_calls: whether this response contained any MCP calls
"""
output_items = {}
text_buffers = {}
mcp_items = {}
print(f"\n{'='*60}")
print(f"[STREAM] Starting stream with model={stream_kwargs.get('model', '?')}")
print(f"[STREAM] Has previous_response_id: {'previous_response_id' in stream_kwargs}")
print(f"{'='*60}")
with client.responses.stream(**stream_kwargs) as stream:
for event in stream:
event_type = getattr(event, "type", "unknown")
output_index = getattr(event, "output_index", None)
item_id = getattr(event, "item_id", None)
# Log non-trivial events
if event_type not in (
"response.output_text.delta",
"response.created",
"response.in_progress",
"response.mcp_call_arguments.delta",
):
print(
f"[STREAM EVENT] type={event_type}, output_index={output_index}, item_id={item_id}"
)
if event_type == "response.output_text.delta":
if output_index not in text_buffers:
text_buffers[output_index] = ""
print(f"[STREAM] New text block at output_index={output_index}")
text_buffers[output_index] += event.delta
elif event_type == "response.output_text.done":
if output_index in text_buffers:
full_text = text_buffers[output_index].strip()
if full_text:
output_items[output_index] = {
"type": "text",
"content": full_text,
}
print(f"[STREAM] Text DONE at index {output_index}: '{full_text[:80]}'")
elif event_type == "response.mcp_call.completed":
if output_index is not None:
mcp_items[output_index] = item_id
print(f"[STREAM] MCP call DONE at index {output_index}, item_id={item_id}")
elif event_type == "response.mcp_call.in_progress":
print(f"[STREAM] MCP call IN PROGRESS at index {output_index}")
final_response = stream.get_final_response()
# Log final response structure
print(f"\n[FINAL] Output items: {len(final_response.output)}")
for i, out_item in enumerate(final_response.output):
out_type = getattr(out_item, "type", "unknown")
out_id = getattr(out_item, "id", None)
if out_type == "mcp_call":
print(f" [{i}] {out_type}: {getattr(out_item, 'name', '?')}({getattr(out_item, 'arguments', '{}')})")
elif out_type == "message":
print(f" [{i}] {out_type}: role={getattr(out_item, 'role', '?')}")
elif out_type == "mcp_list_tools":
print(f" [{i}] {out_type}: server={getattr(out_item, 'server_label', '?')}")
else:
print(f" [{i}] {out_type}: id={out_id}")
# Resolve MCP call details into output_items
for output_index, item_id in mcp_items.items():
for output_item in final_response.output:
if (
hasattr(output_item, "type")
and output_item.type == "mcp_call"
and hasattr(output_item, "id")
and output_item.id == item_id
):
tool_name = getattr(output_item, "name", None)
tool_args_str = getattr(output_item, "arguments", "{}")
try:
tool_args = json.loads(tool_args_str)
except json.JSONDecodeError:
tool_args = {}
tool_type = get_metadata_field(tool_name, "tool_type")
needs_sync = tool_type == "needs_sync"
output_items[output_index] = {
"type": "function",
"tool_name": tool_name,
"tool_args": tool_args,
"needs_sync": needs_sync,
}
print(f"[FINAL] Function at index {output_index}: {tool_name}({tool_args})")
break
had_tool_calls = len(mcp_items) > 0
print(f"[FINAL] text_keys={sorted(text_buffers.keys())} mcp_keys={sorted(mcp_items.keys())} had_tools={had_tool_calls}")
return output_items, final_response, had_tool_calls
def _yield_items_in_order(output_items):
"""
Yield text chunks and function calls from output_items in output_index order.
"""
min_chunk_len = 30
print(f"\n[YIELD] {len(output_items)} items in order: {sorted(output_items.keys())}")
for idx in sorted(output_items.keys()):
it = output_items[idx]
if it["type"] == "text":
print(f" [idx={idx}] TEXT: '{it['content'][:60]}'")
elif it["type"] == "function":
print(f" [idx={idx}] FUNCTION: {it['tool_name']}({it['tool_args']})")
for output_index in sorted(output_items.keys()):
item = output_items[output_index]
if item["type"] == "text":
full_text = item["content"]
chunks = []
buffer = ""
for char in full_text:
buffer += char
if char in ".!?" and len(buffer) >= min_chunk_len:
chunks.append(buffer.strip())
buffer = ""
if buffer.strip():
chunks.append(buffer.strip())
for chunk in chunks:
if chunk:
print(f"[text chunk] {chunk}")
yield chunk, "text"
elif item["type"] == "function":
tool_name = item["tool_name"]
tool_args = item["tool_args"]
needs_sync = item["needs_sync"]
print(f"🔍 Yielding function: {tool_name}({tool_args}) needs_sync={needs_sync}")
yield (tool_name, tool_args, needs_sync), "function"
def stream_with_functions(messages):
"""
Stream LLM response with multi-turn continuation.
After each response that contains tool calls, uses previous_response_id to
continue the conversation so the model can produce more text and tool calls.
This enables interleaved sequences like: text tool text tool text.
Yields:
Tuples of (content, type) where type is "text", "function", or "final_response"
"""
mcp_config = load_mcp_config()
# Base kwargs (used for first call)
base_kwargs = {
"model": MODEL,
"input": messages,
"temperature": 1,
"top_p": 1,
"max_output_tokens": 2048,
}
if mcp_config:
base_kwargs["tools"] = [
{
"type": "mcp",
"server_label": mcp_config["server_name"],
"server_url": f"{mcp_config['url']}/mcp",
"require_approval": "never",
}
]
max_continuations = 10 # Safety limit to prevent infinite loops
continuation = 0
previous_response_id = None
all_final_responses = []
while continuation <= max_continuations:
print(f"\n{'#'*60}")
print(f"[LOOP] Continuation {continuation}/{max_continuations}")
print(f"{'#'*60}")
if continuation == 0:
# First call: use full messages
stream_kwargs = base_kwargs.copy()
else:
# Continuation: use previous_response_id to let model continue
stream_kwargs = {
"model": MODEL,
"previous_response_id": previous_response_id,
"input": [], # No new input, just continue
"temperature": 1,
"top_p": 1,
"max_output_tokens": 2048,
}
if mcp_config:
stream_kwargs["tools"] = base_kwargs["tools"]
output_items, final_response, had_tool_calls = _do_single_stream(stream_kwargs)
all_final_responses.append(final_response)
previous_response_id = final_response.id
print(f"[LOOP] Response ID: {previous_response_id}")
# Yield all items from this response turn
yield from _yield_items_in_order(output_items)
if not had_tool_calls:
# No tool calls = model is done, no need to continue
print(f"[LOOP] No tool calls in this turn, done.")
break
# Model made tool calls — continue so it can produce more text/actions
print(f"[LOOP] Had tool calls, continuing to get more output...")
continuation += 1
if continuation > max_continuations:
print(f"[LOOP] WARNING: Hit max continuations ({max_continuations})")
# Yield ALL accumulated final responses so history can be saved from each turn
yield all_final_responses, "final_responses"
# ==================== Main Loop ====================
def main_loop():
"""Main conversation loop with ordered playback."""
# Ensure directories exist
Path("client/audio").mkdir(parents=True, exist_ok=True)
Path("audio").mkdir(parents=True, exist_ok=True)
# Start playback worker
playback = OrderedPlaybackWorker()
playback.start()
while True:
try:
# NEW: Wait for playback queue to finish before starting microphone
print("\n⏳ Waiting for playback queue to finish...")
playback.wait_until_finished()
time.sleep(2)
print("✅ Queue finished, ready for input")
input("ENTER TP CONTINUE")
# Idle animation
try:
idle_anim = Path("animations/mixamo") / "Idle.fbx"
# vrm_animate("start_mixamo", str(idle_anim))
except Exception:
pass
# Get user input via voice
print("\n🎤 [RECORDING] Listening for speech...")
conversation_recording = Path("audio") / "conversation.wav"
conversation_recording.parent.mkdir(parents=True, exist_ok=True)
conversation_recording = str(conversation_recording)
record_on_speech(
output_file=conversation_recording,
samplerate=44100,
channels=1,
silence_threshold=0.04, # Adjust based on your microphone sensitivity
silence_duration=1, # Stop after 1 second of silence
device=None, # Use default device, or specify by ID or name
)
user_spoken_text = transcribe_audio_groq(aud_path=conversation_recording)
user_input = user_spoken_text
print(f"💬 [USER] {user_input}")
# Thinking animation
try:
thinking_anim = Path("animations/mixamo") / "Thinking.fbx"
# vrm_animate("start_mixamo", str(thinking_anim))
except Exception:
pass
# Build messages
messages = load_history()
messages.append(
{
"role": "user",
"content": [{"type": "input_text", "text": user_input}],
}
)
print("[llm] streaming response...")
full_assistant_text = ""
all_final_responses = []
# Stream and process items in order
for item, item_type in stream_with_functions(messages):
if item_type == "text":
# Text chunk
text_chunk = item
print(f"[text chunk] {text_chunk}")
full_assistant_text += text_chunk + " "
# Generate TTS
tts_text = clean_llm_output(text_chunk)
emotion = "relaxed"
expression = "relaxed"
# Create unique audio file
uid = uuid.uuid4().hex
filename = f"output_{uid}.wav"
client_out = Path("client") / "audio" / filename
public_out = Path("audio") / filename
# Generate TTS audio
try:
sovits_gen_emotion(
tts_text, emotion=emotion, output_wav_pth=str(client_out)
)
except TypeError:
sovits_gen_emotion(tts_text, emotion, str(client_out))
# Copy to public path
shutil.copy2(client_out, public_out)
# Get duration
try:
duration = get_wav_duration(public_out)
except Exception:
duration = 3.0
# Create playback item and enqueue
playback_item = PlaybackItem(
item_type=ItemType.TEXT,
content=text_chunk,
audio_path=public_out,
expression=expression,
duration=duration,
)
playback.enqueue(playback_item)
elif item_type == "function":
# Function call
tool_name, tool_args, needs_sync = item
print(
f"[function call] {tool_name}({tool_args}) needs_sync={needs_sync}"
)
# Create function playback item
function_item = PlaybackItem(
item_type=ItemType.FUNCTION_CALL,
content=tool_name,
arguments=tool_args,
needs_sync=needs_sync,
)
playback.enqueue(function_item)
elif item_type == "final_responses":
all_final_responses = item
# Save to history - process ALL response turns
final_text = full_assistant_text.strip()
print(f"[llm final] {final_text}")
if all_final_responses:
for resp in all_final_responses:
if not hasattr(resp, "output"):
continue
for item in resp.output:
if item.type == "mcp_list_tools":
tool_list = []
for tool in item.tools:
tool_info = {
"name": tool.name,
"description": tool.description,
"input_schema": tool.input_schema,
}
if hasattr(tool, "annotations"):
tool_info["annotations"] = tool.annotations
tool_list.append(tool_info)
messages.append(
{
"type": "mcp_list_tools",
"server_label": item.server_label,
"tools": tool_list,
}
)
elif item.type == "mcp_call":
messages.append(
{
"type": "mcp_call",
"name": item.name,
"arguments": item.arguments,
"server_label": item.server_label,
"output": item.output,
}
)
elif item.type == "message":
content_list = []
for c in item.content:
if c.type == "output_text":
content_list.append(
{"type": "output_text", "text": c.text}
)
messages.append(
{"role": item.role, "content": content_list}
)
else:
messages.append(
{
"role": "assistant",
"content": [{"type": "output_text", "text": final_text}],
}
)
save_history(messages)
# Small delay before next loop (optional, since we wait for queue anyway)
time.sleep(0.1)
except KeyboardInterrupt:
print("\nInterrupted by user, stopping.")
playback.stop()
break
except Exception as e:
print(f"Error in main loop: {e}")
import traceback
traceback.print_exc()
time.sleep(1)
if __name__ == "__main__":
main_loop()