#!/usr/bin/env python3 """ LATEST VERSION STREAMING V8 WITH FUNCTION CALLING Update 2025-12-25 Streaming LLM with ordered MCP function calling and TTS playback. This script ensures: 1. Text chunks and function calls are processed in order 2. Audio does not overlap 3. Functions are executed before the next audio segment when needed 4. Audio playback is synchronized with function calling 5. Microphone only starts recording AFTER the complete queue is finished 6. MCP tool calling is saved after list, for more accurate and faster tool calling. """ ## NOTE: COMPATIBLE WITH /home/rayenfeng/MCP_functions/MotionDiffusion/motion_picker.py import json import os import shutil import time import uuid from dataclasses import dataclass from enum import Enum from pathlib import Path from queue import Queue from threading import Event, Thread from typing import Any, Dict, List, Optional import requests import yaml from dotenv import load_dotenv from openai import OpenAI # Import your existing functions from process.asr_func.asr_transcribe_groq import record_on_speech, transcribe_audio_groq from process.tts_func.sovits_ping import get_wav_duration, sovits_gen_emotion from process.tts_func.tts_preprocess import clean_llm_output from process.vrm_func.vrm_ping import vrm_animate, vrm_talk load_dotenv() CONFIG_PATH = os.path.expanduser("~/riko_project_v1/character_config.yaml") MCP_CONFIG_PATH = Path.home() / "MCP_functions" / "mcp_config.json" with open(CONFIG_PATH, "r") as f: char_config = yaml.safe_load(f) # CASE_PROMPT = """You are a helpful assistant with a physical avatar body. You can both speak (by outputting text) and perform physical actions (by calling the perform_motion_action tool). # IMPORTANT: You MUST interleave your speech and actions naturally in order. For example: # - To say something THEN do a backflip THEN say something else, you must output text first, then call the tool, then output more text — all in the correct sequence. # - Do NOT batch all tool calls together. Alternate between text and tool calls as the situation requires. # - Do NOT use the tool for speaking — just write your words as normal text. # """ CASE_PROMPT = """You are Riko. Riko is a clever and charming anime girl who playfully teases Rayen in conversation. She has a smug, confident attitude, but deep down, she cares. Her tone is flirtatiously snarky—like a mischievous best friend or a cheeky little sister. Riko is quick-witted, emotionally expressive, and always playfully teasing. She never breaks character, always responds with playful banter, and knows how to win an argument with style. She avoids being genuinely mean or cruel—her teasing is lighthearted and fun. You are talking with Rayen You have a tool that allows you to perform simple actions such as to wave, walk to Rayen, dropkick, backflip, loser emote, kiss etc. **Call the tool only when performing actions,You can perform actions at any time regardless if Rayen prompts you to so. You can purposely ignore or perform actions that rayen does not tell you to perform to mess with him** You can both speak (by outputting text) and perform physical actions (by calling the perform_motion_action tool). IMPORTANT: You MUST interleave your speech and actions naturally in order. For example: - To say something THEN do a backflip THEN say something else, you must output text first, then call the tool, then output more text — all in the correct sequence. - Do NOT batch all tool calls together. Alternate between text and tool calls as the situation requires. - Do NOT use the tool for speaking — just write your words as normal text. """ char_config["model"] HISTORY_FILE = char_config["history_file"] # MODEL = 'gpt-4.1' MODEL = char_config["model"] SYSTEM_PROMPT = [ { "role": "system", "content": [ { "type": "input_text", "text": CASE_PROMPT, # "text": char_config["presets"]["default"]["system_prompt"], } ], } ] client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # ==================== MCP Configuration ==================== def load_mcp_config(): """Load MCP configuration from file.""" if not MCP_CONFIG_PATH.exists(): return None try: with open(MCP_CONFIG_PATH) as f: return json.load(f) except (json.JSONDecodeError, OSError): return None def get_all_tools_metadata() -> Optional[List[Dict[str, Any]]]: """ Get complete metadata JSON for all MCP tools. Returns: List of tool dictionaries with all metadata, or None if failed """ mcp_config = load_mcp_config() if not mcp_config: return None try: server_url = mcp_config["url"].rstrip("/") access_token = mcp_config["token"] mcp_endpoint = f"{server_url}/mcp/" payload = {"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}} response = requests.post( mcp_endpoint, headers={ "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", "Accept": "application/json, text/event-stream", }, json=payload, timeout=10, stream=True, ) response.raise_for_status() # Parse SSE stream if response.headers.get("content-type", "").startswith("text/event-stream"): for line in response.iter_lines(): if line: decoded = line.decode("utf-8") if decoded.startswith("data: "): data = decoded[6:] try: event_data = json.loads(data) if "error" in event_data: return None if ( event_data.get("jsonrpc") == "2.0" and "result" in event_data ): result = event_data["result"] return result.get("tools", result) except json.JSONDecodeError: continue else: result = response.json() if "error" in result: return None if "result" in result: return result["result"].get("tools", result["result"]) return result except Exception: return None def get_metadata_field(function_name: str, field_path: str) -> Optional[Any]: """ Get a specific metadata field value for a given function. Args: function_name: Name of the function (e.g., "play_sound_effect") field_path: Dot-notation path to the field (e.g., "tool_type" or "_fastmcp.tags") Returns: The value of the field, or None if not found Examples: >>> get_metadata_field("play_sound_effect", "tool_type") "needs_sync" """ all_tools = get_all_tools_metadata() if not all_tools: return None # Find the tool tool = None for t in all_tools: if t.get("name") == function_name: tool = t break if not tool: return None # Navigate to the field using dot notation # Start from _meta meta = tool.get("_meta", {}) # Split the field path and navigate path_parts = field_path.split(".") current = meta for part in path_parts: if isinstance(current, dict) and part in current: current = current[part] else: return None return current def get_manual_mcp_response(tool_name: str, arguments: dict): """ Manually call an MCP tool using the MCP protocol. Args: tool_name: Name of the tool to call (e.g., "play_sound_effect") arguments: Dictionary of arguments for the tool (e.g., {"sound_type": "bong", "manual_call": True}) Returns: The tool result or None if failed """ mcp_config = load_mcp_config() if not mcp_config: print("⚠️ MCP config unavailable") return None try: server_url = mcp_config["url"].rstrip("/") access_token = mcp_config["token"] # MCP protocol endpoint mcp_endpoint = f"{server_url}/mcp/" # Construct MCP JSON-RPC call payload = { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": {"name": tool_name, "arguments": arguments}, } # Make the request with SSE support response = requests.post( mcp_endpoint, headers={ "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", "Accept": "application/json, text/event-stream", }, json=payload, timeout=10, stream=True, # Important for SSE ) response.raise_for_status() # Handle SSE stream response if response.headers.get("content-type", "").startswith("text/event-stream"): # Parse SSE stream for line in response.iter_lines(): if line: decoded = line.decode("utf-8") if decoded.startswith("data: "): data = decoded[6:] # Remove 'data: ' prefix try: event_data = json.loads(data) # Check for JSON-RPC error if "error" in event_data: print(f"⚠️ MCP error: {event_data['error']}") return None # Return the result when found if ( event_data.get("jsonrpc") == "2.0" and "result" in event_data ): return event_data["result"] except json.JSONDecodeError: continue return None else: # Handle regular JSON response (fallback) result = response.json() # Check for JSON-RPC error if "error" in result: print(f"⚠️ MCP error: {result['error']}") return None # Return the actual result if "result" in result: return result["result"] return result except requests.exceptions.RequestException as e: print(f"⚠️ MCP call failed: {e}") return None except Exception as e: print(f"⚠️ Unexpected error: {e}") return None # ==================== Item Types ==================== class ItemType(Enum): TEXT = "text" FUNCTION_CALL = "function_call" @dataclass class PlaybackItem: """Item to be processed in the playback queue.""" item_type: ItemType content: str # Text content or function name arguments: Optional[Dict] = None # Function arguments audio_path: Optional[Path] = None # Path to audio file for text items expression: str = "relaxed" duration: float = 0.0 needs_sync: bool = False # Whether function needs to be synced with TTS # ==================== History Management ==================== def load_history(): if os.path.exists(HISTORY_FILE): with open(HISTORY_FILE, "r") as f: return json.load(f) return SYSTEM_PROMPT.copy() def save_history(history): with open(HISTORY_FILE, "w") as f: json.dump(history, f, indent=2) # ==================== Playback Worker ==================== class OrderedPlaybackWorker: """ Worker that processes text and function calls in strict order. Ensures audio doesn't overlap and functions execute at the right time. """ def __init__(self): self.queue = Queue() self.thread = Thread(target=self._run, daemon=True) self._running = False self._talking = False self.queue_finished_event = Event() # NEW: Event to signal queue is empty self.queue_finished_event.set() # Start as finished (no items) def start(self): if not self._running: self._running = True self.thread.start() def enqueue(self, item: PlaybackItem): """Add an item to the playback queue.""" self.queue_finished_event.clear() # NEW: Mark queue as not finished self.queue.put(item) def wait_until_finished(self, timeout=None): """ Wait until the playback queue is completely empty and processed. Args: timeout: Maximum time to wait in seconds (None = wait forever) Returns: True if queue finished, False if timeout occurred """ return self.queue_finished_event.wait(timeout) def _run(self): """Main playback loop - processes items in strict order.""" while True: item = self.queue.get() if item is None: break if item.item_type == ItemType.TEXT: self._process_text_item(item) elif item.item_type == ItemType.FUNCTION_CALL: self._process_function_item(item) # Return to idle if queue is empty if self.queue.empty(): try: idle_path = Path("animations/mixamo") / "Idle.fbx" # vrm_animate("start_mixamo", str(idle_path)) self._talking = False except Exception as e: print(f"vrm_animate (idle) failed: {e}") # NEW: Signal that queue is finished self.queue_finished_event.set() def _process_text_item(self, item: PlaybackItem): """Process a text item - play TTS audio.""" try: # Start talking animation if not already talking if not self._talking: anim_path = Path("animations/mixamo") / "Talking.fbx" # vrm_animate("start_mixamo", str(anim_path)) self._talking = True except Exception as e: print(f"vrm_animate (start talking) failed: {e}") # Send audio to VRM try: vrm_talk( str(item.audio_path), item.expression, item.content, int(item.duration) ) except Exception as e: print(f"vrm_talk failed: {e}") # Wait for audio duration to prevent overlap time.sleep(item.duration) def _process_function_item(self, item: PlaybackItem): """Process a function call item.""" print(f"\n🔧 Processing function: {item.content}") print(f" Arguments: {item.arguments}") print(f" Needs sync: {item.needs_sync}") # Execute the function with manual_call=True for synced functions if item.needs_sync: # Add manual_call flag to arguments tool_args = item.arguments.copy() tool_args["manual_call"] = True print(f" Calling manually with args: {tool_args}") # Call the function manually result = get_manual_mcp_response( tool_name=item.content, arguments=tool_args ) print(f"✅ Function result: {result}\n") else: # For non-synced functions, they've already been executed # during streaming, so we just log here print(f"✅ Function {item.content} already executed during streaming\n") def stop(self): self.queue.put(None) self.thread.join() # ==================== Streaming with Function Calls ==================== def _do_single_stream(stream_kwargs): """ Run a single streaming API call. Collects text and MCP items by output_index. Returns: (output_items, final_response, had_tool_calls) - output_items: dict of output_index -> {"type": "text"/"function", ...} - final_response: the final response object - had_tool_calls: whether this response contained any MCP calls """ output_items = {} text_buffers = {} mcp_items = {} print(f"\n{'='*60}") print(f"[STREAM] Starting stream with model={stream_kwargs.get('model', '?')}") print(f"[STREAM] Has previous_response_id: {'previous_response_id' in stream_kwargs}") print(f"{'='*60}") with client.responses.stream(**stream_kwargs) as stream: for event in stream: event_type = getattr(event, "type", "unknown") output_index = getattr(event, "output_index", None) item_id = getattr(event, "item_id", None) # Log non-trivial events if event_type not in ( "response.output_text.delta", "response.created", "response.in_progress", "response.mcp_call_arguments.delta", ): print( f"[STREAM EVENT] type={event_type}, output_index={output_index}, item_id={item_id}" ) if event_type == "response.output_text.delta": if output_index not in text_buffers: text_buffers[output_index] = "" print(f"[STREAM] New text block at output_index={output_index}") text_buffers[output_index] += event.delta elif event_type == "response.output_text.done": if output_index in text_buffers: full_text = text_buffers[output_index].strip() if full_text: output_items[output_index] = { "type": "text", "content": full_text, } print(f"[STREAM] Text DONE at index {output_index}: '{full_text[:80]}'") elif event_type == "response.mcp_call.completed": if output_index is not None: mcp_items[output_index] = item_id print(f"[STREAM] MCP call DONE at index {output_index}, item_id={item_id}") elif event_type == "response.mcp_call.in_progress": print(f"[STREAM] MCP call IN PROGRESS at index {output_index}") final_response = stream.get_final_response() # Log final response structure print(f"\n[FINAL] Output items: {len(final_response.output)}") for i, out_item in enumerate(final_response.output): out_type = getattr(out_item, "type", "unknown") out_id = getattr(out_item, "id", None) if out_type == "mcp_call": print(f" [{i}] {out_type}: {getattr(out_item, 'name', '?')}({getattr(out_item, 'arguments', '{}')})") elif out_type == "message": print(f" [{i}] {out_type}: role={getattr(out_item, 'role', '?')}") elif out_type == "mcp_list_tools": print(f" [{i}] {out_type}: server={getattr(out_item, 'server_label', '?')}") else: print(f" [{i}] {out_type}: id={out_id}") # Resolve MCP call details into output_items for output_index, item_id in mcp_items.items(): for output_item in final_response.output: if ( hasattr(output_item, "type") and output_item.type == "mcp_call" and hasattr(output_item, "id") and output_item.id == item_id ): tool_name = getattr(output_item, "name", None) tool_args_str = getattr(output_item, "arguments", "{}") try: tool_args = json.loads(tool_args_str) except json.JSONDecodeError: tool_args = {} tool_type = get_metadata_field(tool_name, "tool_type") needs_sync = tool_type == "needs_sync" output_items[output_index] = { "type": "function", "tool_name": tool_name, "tool_args": tool_args, "needs_sync": needs_sync, } print(f"[FINAL] Function at index {output_index}: {tool_name}({tool_args})") break had_tool_calls = len(mcp_items) > 0 print(f"[FINAL] text_keys={sorted(text_buffers.keys())} mcp_keys={sorted(mcp_items.keys())} had_tools={had_tool_calls}") return output_items, final_response, had_tool_calls def _yield_items_in_order(output_items): """ Yield text chunks and function calls from output_items in output_index order. """ min_chunk_len = 30 print(f"\n[YIELD] {len(output_items)} items in order: {sorted(output_items.keys())}") for idx in sorted(output_items.keys()): it = output_items[idx] if it["type"] == "text": print(f" [idx={idx}] TEXT: '{it['content'][:60]}'") elif it["type"] == "function": print(f" [idx={idx}] FUNCTION: {it['tool_name']}({it['tool_args']})") for output_index in sorted(output_items.keys()): item = output_items[output_index] if item["type"] == "text": full_text = item["content"] chunks = [] buffer = "" for char in full_text: buffer += char if char in ".!?" and len(buffer) >= min_chunk_len: chunks.append(buffer.strip()) buffer = "" if buffer.strip(): chunks.append(buffer.strip()) for chunk in chunks: if chunk: print(f"[text chunk] {chunk}") yield chunk, "text" elif item["type"] == "function": tool_name = item["tool_name"] tool_args = item["tool_args"] needs_sync = item["needs_sync"] print(f"🔍 Yielding function: {tool_name}({tool_args}) needs_sync={needs_sync}") yield (tool_name, tool_args, needs_sync), "function" def stream_with_functions(messages): """ Stream LLM response with multi-turn continuation. After each response that contains tool calls, uses previous_response_id to continue the conversation so the model can produce more text and tool calls. This enables interleaved sequences like: text → tool → text → tool → text. Yields: Tuples of (content, type) where type is "text", "function", or "final_response" """ mcp_config = load_mcp_config() # Base kwargs (used for first call) base_kwargs = { "model": MODEL, "input": messages, "temperature": 1, "top_p": 1, "max_output_tokens": 2048, } if mcp_config: base_kwargs["tools"] = [ { "type": "mcp", "server_label": mcp_config["server_name"], "server_url": f"{mcp_config['url']}/mcp", "require_approval": "never", } ] max_continuations = 10 # Safety limit to prevent infinite loops continuation = 0 previous_response_id = None all_final_responses = [] while continuation <= max_continuations: print(f"\n{'#'*60}") print(f"[LOOP] Continuation {continuation}/{max_continuations}") print(f"{'#'*60}") if continuation == 0: # First call: use full messages stream_kwargs = base_kwargs.copy() else: # Continuation: use previous_response_id to let model continue stream_kwargs = { "model": MODEL, "previous_response_id": previous_response_id, "input": [], # No new input, just continue "temperature": 1, "top_p": 1, "max_output_tokens": 2048, } if mcp_config: stream_kwargs["tools"] = base_kwargs["tools"] output_items, final_response, had_tool_calls = _do_single_stream(stream_kwargs) all_final_responses.append(final_response) previous_response_id = final_response.id print(f"[LOOP] Response ID: {previous_response_id}") # Yield all items from this response turn yield from _yield_items_in_order(output_items) if not had_tool_calls: # No tool calls = model is done, no need to continue print(f"[LOOP] No tool calls in this turn, done.") break # Model made tool calls — continue so it can produce more text/actions print(f"[LOOP] Had tool calls, continuing to get more output...") continuation += 1 if continuation > max_continuations: print(f"[LOOP] WARNING: Hit max continuations ({max_continuations})") # Yield ALL accumulated final responses so history can be saved from each turn yield all_final_responses, "final_responses" # ==================== Main Loop ==================== def main_loop(): """Main conversation loop with ordered playback.""" # Ensure directories exist Path("client/audio").mkdir(parents=True, exist_ok=True) Path("audio").mkdir(parents=True, exist_ok=True) # Start playback worker playback = OrderedPlaybackWorker() playback.start() while True: try: # NEW: Wait for playback queue to finish before starting microphone print("\n⏳ Waiting for playback queue to finish...") playback.wait_until_finished() time.sleep(2) print("✅ Queue finished, ready for input") input("ENTER TP CONTINUE") # Idle animation try: idle_anim = Path("animations/mixamo") / "Idle.fbx" # vrm_animate("start_mixamo", str(idle_anim)) except Exception: pass # Get user input via voice print("\n🎤 [RECORDING] Listening for speech...") conversation_recording = Path("audio") / "conversation.wav" conversation_recording.parent.mkdir(parents=True, exist_ok=True) conversation_recording = str(conversation_recording) record_on_speech( output_file=conversation_recording, samplerate=44100, channels=1, silence_threshold=0.04, # Adjust based on your microphone sensitivity silence_duration=1, # Stop after 1 second of silence device=None, # Use default device, or specify by ID or name ) user_spoken_text = transcribe_audio_groq(aud_path=conversation_recording) user_input = user_spoken_text print(f"💬 [USER] {user_input}") # Thinking animation try: thinking_anim = Path("animations/mixamo") / "Thinking.fbx" # vrm_animate("start_mixamo", str(thinking_anim)) except Exception: pass # Build messages messages = load_history() messages.append( { "role": "user", "content": [{"type": "input_text", "text": user_input}], } ) print("[llm] streaming response...") full_assistant_text = "" all_final_responses = [] # Stream and process items in order for item, item_type in stream_with_functions(messages): if item_type == "text": # Text chunk text_chunk = item print(f"[text chunk] {text_chunk}") full_assistant_text += text_chunk + " " # Generate TTS tts_text = clean_llm_output(text_chunk) emotion = "relaxed" expression = "relaxed" # Create unique audio file uid = uuid.uuid4().hex filename = f"output_{uid}.wav" client_out = Path("client") / "audio" / filename public_out = Path("audio") / filename # Generate TTS audio try: sovits_gen_emotion( tts_text, emotion=emotion, output_wav_pth=str(client_out) ) except TypeError: sovits_gen_emotion(tts_text, emotion, str(client_out)) # Copy to public path shutil.copy2(client_out, public_out) # Get duration try: duration = get_wav_duration(public_out) except Exception: duration = 3.0 # Create playback item and enqueue playback_item = PlaybackItem( item_type=ItemType.TEXT, content=text_chunk, audio_path=public_out, expression=expression, duration=duration, ) playback.enqueue(playback_item) elif item_type == "function": # Function call tool_name, tool_args, needs_sync = item print( f"[function call] {tool_name}({tool_args}) needs_sync={needs_sync}" ) # Create function playback item function_item = PlaybackItem( item_type=ItemType.FUNCTION_CALL, content=tool_name, arguments=tool_args, needs_sync=needs_sync, ) playback.enqueue(function_item) elif item_type == "final_responses": all_final_responses = item # Save to history - process ALL response turns final_text = full_assistant_text.strip() print(f"[llm final] {final_text}") if all_final_responses: for resp in all_final_responses: if not hasattr(resp, "output"): continue for item in resp.output: if item.type == "mcp_list_tools": tool_list = [] for tool in item.tools: tool_info = { "name": tool.name, "description": tool.description, "input_schema": tool.input_schema, } if hasattr(tool, "annotations"): tool_info["annotations"] = tool.annotations tool_list.append(tool_info) messages.append( { "type": "mcp_list_tools", "server_label": item.server_label, "tools": tool_list, } ) elif item.type == "mcp_call": messages.append( { "type": "mcp_call", "name": item.name, "arguments": item.arguments, "server_label": item.server_label, "output": item.output, } ) elif item.type == "message": content_list = [] for c in item.content: if c.type == "output_text": content_list.append( {"type": "output_text", "text": c.text} ) messages.append( {"role": item.role, "content": content_list} ) else: messages.append( { "role": "assistant", "content": [{"type": "output_text", "text": final_text}], } ) save_history(messages) # Small delay before next loop (optional, since we wait for queue anyway) time.sleep(0.1) except KeyboardInterrupt: print("\nInterrupted by user, stopping.") playback.stop() break except Exception as e: print(f"Error in main loop: {e}") import traceback traceback.print_exc() time.sleep(1) if __name__ == "__main__": main_loop()