#!/usr/bin/env python3 """ main_chat_v9.py - voice loop with streaming LLM. Modified for OpenRouter / Classic Chat Completions API. """ import json import os import shutil import time import uuid import math from dataclasses import dataclass from enum import Enum from pathlib import Path from queue import Queue from threading import Event, Lock, Thread from typing import Any, Dict, List, Optional import requests import yaml from dotenv import load_dotenv from openai import OpenAI from process.asr_func.asr_transcribe_groq import record_on_speech, transcribe_audio_groq from process.tts_func.sovits_ping import get_wav_duration from process.tts_func.elevenlabs_ping import elevenlabs_gen from process.tts_func.tts_preprocess import clean_llm_output from process.vrm_func.vrm_ping import vrm_animate, vrm_talk from process.vrm_func.vrm_states_ping import set_vrm_state # Server URL for fetching pending user actions (clicks). SERVER_BASE_URL = "http://localhost:8001" load_dotenv() # ==================== Paths & Config ==================== PROJECT_ROOT = Path(__file__).resolve().parent.parent # Liest die Auswahl aus dem GUI Launcher aus. Fallback ist 'characters/hakuro.yaml' persona_file = os.getenv("ACTIVE_PERSONA", "characters/hakuro.yaml") CONFIG_PATH = PROJECT_ROOT / persona_file MCP_CONFIG_PATH = Path( os.getenv("MCP_CONFIG_PATH", str(Path.home() / "MCP_functions" / "mcp_config.json")) ) if not CONFIG_PATH.exists(): raise FileNotFoundError(f"[config] character_config.yaml not found at {CONFIG_PATH}") with open(CONFIG_PATH, "r", encoding="utf-8") as f: char_config = yaml.safe_load(f) HISTORY_FILE = char_config["history_file"] MODEL = char_config.get("model", "model") BASE_SYSTEM_PROMPT = char_config["presets"]["default"]["system_prompt"] # Appended only when MCP is active so the model knows to interleave speech and tool calls. TOOL_USE_RULES = """ You also have access to a tool that lets you perform physical actions. Call the tool only when performing actions. You can both speak (by outputting text) and perform physical actions (by calling the tool). """ OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") if not OPENAI_API_KEY: raise EnvironmentError("[config] Please set OPENAI_API_KEY in your environment") client = OpenAI(api_key=OPENAI_API_KEY) # ==================== emotional translator ================= def get_vrm_expression(text: str) -> str: text_lower = text.lower() if any(word in text_lower for word in ['*giggle', '*laugh', '*smile', '*smirk', '*cheer', '[laughs]']): return "happy" elif any(word in text_lower for word in ['*angry', '*glare', '*mad', '*frustrat']): return "angry" elif any(word in text_lower for word in ['*sad', '*cry', '*sigh', '*sorrow', '[sigh]']): return "sad" elif any(word in text_lower for word in ['*gasp', '*shock', '*surpris', '*wide', '[gasps]']): return "surprised" elif any(word in text_lower for word in ['*neutral', '*stare']): return "neutral" else: return "relaxed" # ==================== MCP Optional Layer ==================== def load_mcp_config() -> Optional[dict]: if not MCP_CONFIG_PATH.exists(): return None try: with open(MCP_CONFIG_PATH, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError) as e: print(f"[mcp] failed to load config at {MCP_CONFIG_PATH}: {e}") return None def is_mcp_available() -> bool: return load_mcp_config() is not None def _mcp_jsonrpc(method: str, params: dict) -> Optional[Any]: cfg = load_mcp_config() if not cfg: return None try: endpoint = f"{cfg['url'].rstrip('/')}/mcp/" resp = requests.post( endpoint, headers={ "Authorization": f"Bearer {cfg['token']}", "Content-Type": "application/json", "Accept": "application/json, text/event-stream", }, json={"jsonrpc": "2.0", "id": 1, "method": method, "params": params}, timeout=10, stream=True, ) resp.raise_for_status() if resp.headers.get("content-type", "").startswith("text/event-stream"): for line in resp.iter_lines(): if not line: continue decoded = line.decode("utf-8") if not decoded.startswith("data: "): continue try: event_data = json.loads(decoded[6:]) except json.JSONDecodeError: continue if "error" in event_data: print(f"[mcp] error from {method}: {event_data['error']}") return None if event_data.get("jsonrpc") == "2.0" and "result" in event_data: return event_data["result"] return None body = resp.json() if "error" in body: print(f"[mcp] error from {method}: {body['error']}") return None return body.get("result", body) except Exception as e: print(f"[mcp] {method} failed: {e}") return None def get_all_tools_metadata() -> Optional[List[Dict[str, Any]]]: result = _mcp_jsonrpc("tools/list", {}) if not result: return None return result.get("tools", result) if isinstance(result, dict) else result def get_metadata_field(function_name: str, field_path: str) -> Optional[Any]: tools = get_all_tools_metadata() if not tools: return None tool = next((t for t in tools if t.get("name") == function_name), None) if not tool: return None current = tool.get("_meta", {}) for part in field_path.split("."): if isinstance(current, dict) and part in current: current = current[part] else: return None return current def call_mcp_tool(tool_name: str, arguments: dict) -> Optional[Any]: return _mcp_jsonrpc("tools/call", {"name": tool_name, "arguments": arguments}) def get_openai_tools() -> Optional[List[dict]]: """Lädt die Tools vom MCP-Server und übersetzt sie ins OpenAI-Format.""" if not is_mcp_available(): return None mcp_tools = get_all_tools_metadata() if not mcp_tools: return None openai_tools = [] for tool in mcp_tools: openai_tools.append({ "type": "function", "function": { "name": tool.get("name"), "description": tool.get("description", ""), "parameters": tool.get("inputSchema", {}) } }) return openai_tools # ==================== Playback Worker ==================== class ItemType(Enum): TEXT = "text" FUNCTION_CALL = "function_call" @dataclass class PlaybackItem: item_type: ItemType content: str arguments: Optional[Dict] = None audio_path: Optional[Path] = None expression: str = "relaxed" duration: float = 0.0 needs_sync: bool = False class OrderedPlaybackWorker: """Sequentially process text + function items so audio never overlaps.""" def __init__(self): self.queue: Queue = Queue() self.thread = Thread(target=self._run, daemon=True) self._running = False self._talking = False self.queue_finished_event = Event() self.queue_finished_event.set() def start(self): if not self._running: self._running = True self.thread.start() print("[playback] worker started") def enqueue(self, item: PlaybackItem): self.queue_finished_event.clear() self.queue.put(item) def wait_until_finished(self, timeout=None) -> bool: return self.queue_finished_event.wait(timeout) def stop(self): self.queue.put(None) self.thread.join() def _run(self): while True: item = self.queue.get() if item is None: break if item.item_type == ItemType.TEXT: self._process_text_item(item) elif item.item_type == ItemType.FUNCTION_CALL: self._process_function_item(item) if self.queue.empty(): self._talking = False self.queue_finished_event.set() print("[playback] queue drained") def _process_text_item(self, item: PlaybackItem): try: preview = item.content.replace("\n", " ") print(f"[playback] talk ({item.duration:.2f}s): {preview!r}") safe_duration = math.ceil(item.duration) vrm_talk(str(item.audio_path), item.expression, item.content, safe_duration) except Exception as e: print(f"[playback] vrm_talk failed: {e}") sleep_for = max(0.05, safe_duration - 0.5) time.sleep(sleep_for) def _process_function_item(self, item: PlaybackItem): print(f"[playback] async animation function: {item.content}({item.arguments})") if item.needs_sync: tool_args = dict(item.arguments or {}) tool_args["manual_call"] = True result = call_mcp_tool(item.content, tool_args) print(f"[playback] function {item.content} result: {result}") # ==================== History ==================== def _initial_system_message() -> dict: return {"role": "system", "content": BASE_SYSTEM_PROMPT} def load_history() -> List[dict]: if os.path.exists(HISTORY_FILE): with open(HISTORY_FILE, "r", encoding="utf-8") as f: return json.load(f) return [_initial_system_message()] def save_history(history: List[dict]): with open(HISTORY_FILE, "w", encoding="utf-8") as f: json.dump(history, f, indent=2) # ==================== Streaming ==================== MIN_CHUNK_LEN = 60 MAX_CHUNK_LEN = 350 CHUNK_PUNCT = (".", "?", "!", "…") def stream_with_functions(messages): """Klassisches Streaming via Chat Completions API für OpenRouter (inkl. Tools).""" tools = get_openai_tools() api_kwargs = { "model": MODEL, "messages": messages, "temperature": 1, "top_p": 1, "max_tokens": 2048, "stream": True } if tools: api_kwargs["tools"] = tools api_kwargs["tool_choice"] = "auto" stream = client.chat.completions.create(**api_kwargs) buffer = "" tool_calls_buffer = {} for chunk in stream: if not chunk.choices: continue delta = chunk.choices[0].delta # 1. Normalen Text verarbeiten if delta.content: buffer += delta.content if buffer.endswith(CHUNK_PUNCT) and len(buffer) >= MIN_CHUNK_LEN: yield buffer.strip(), "text" buffer = "" elif len(buffer) >= MAX_CHUNK_LEN: yield buffer.strip(), "text" buffer = "" # 2. Tool-Calls sammeln if delta.tool_calls: for tc in delta.tool_calls: idx = tc.index if idx not in tool_calls_buffer: tool_calls_buffer[idx] = {"id": tc.id, "name": tc.function.name, "arguments": ""} if tc.function.arguments: tool_calls_buffer[idx]["arguments"] += tc.function.arguments if buffer.strip(): yield buffer.strip(), "text" final_tool_calls = [] for idx, tc in tool_calls_buffer.items(): final_tool_calls.append({ "id": tc["id"], "type": "function", "function": {"name": tc["name"], "arguments": tc["arguments"]} }) yield final_tool_calls, "final_responses" # ==================== Helpers ==================== def ensure_dirs(): Path("client/audio").mkdir(parents=True, exist_ok=True) Path("audio").mkdir(parents=True, exist_ok=True) def _safe_set_state(state: str): try: set_vrm_state(state) except Exception as e: print(f"[main] set_vrm_state({state}) failed: {e}") def _action_to_text(action: dict) -> str: region = (action.get("region") or "").strip() bone = (action.get("bone") or "").strip() if region and region.lower() != "body": label = region.replace("_", " ") elif bone: label = bone.replace("_", " ") else: label = "body" return f"[the user touched your {label}]" def fetch_pending_user_actions() -> List[dict]: try: resp = requests.get(f"{SERVER_BASE_URL}/pop_pending_actions", timeout=2) resp.raise_for_status() return resp.json().get("actions", []) except Exception as e: return [] def _persist_assistant_turn(messages: List[dict], all_final_responses: list, fallback_text: str): assistant_msg = {"role": "assistant"} if fallback_text: assistant_msg["content"] = fallback_text else: assistant_msg["content"] = None if all_final_responses: assistant_msg["tool_calls"] = all_final_responses messages.append(assistant_msg) # ==================== LLM Turn ==================== _llm_turn_lock = Lock() def run_llm_turn(user_text: str, playback: OrderedPlaybackWorker, mcp_on: bool): """ Run full LLM turns in a loop: handles text + iterative tool calls natively. """ with _llm_turn_lock: print(f"\n[turn] >>> {user_text!r}") _safe_set_state("talking") messages = load_history() messages.append({"role": "user", "content": user_text}) # --- KI-Loop: Wiederholt sich, solange die KI Tools aufruft --- while True: full_assistant_text = "" all_final_responses: list = [] for item, item_type in stream_with_functions(messages): if item_type == "text": text_chunk: str = item full_assistant_text += text_chunk + " " tts_text = clean_llm_output(text_chunk) if not tts_text.strip(): continue expression = get_vrm_expression(text_chunk) uid = uuid.uuid4().hex # 1. MP3 Endung und die originalen, lokalen Ordner! filename = f"output_{uid}.mp3" client_out = Path("client") / "audio" / filename public_out = Path("audio") / filename client_out.parent.mkdir(parents=True, exist_ok=True) try: current_voice = char_config.get("voice_id", "wo6udizrrtpIxWGp2qJk") if 'char_config' in globals() else "wo6udizrrtpIxWGp2qJk" elevenlabs_gen(tts_text, str(client_out), current_voice) except Exception as e: print(f"[ElevenLabs Error] API Aufruf fehlgeschlagen: {e}") pass # WICHTIG: Nur kopieren und abspielen, wenn ElevenLabs auch wirklich eine Datei erstellt hat! if client_out.exists(): shutil.copy2(client_out, public_out) try: duration = (len(text_chunk) / 15.0) + 1.0 if duration < 2.0: duration = 2.0 except Exception: duration = 3.0 playback.enqueue( PlaybackItem( item_type=ItemType.TEXT, content=text_chunk, audio_path=str(public_out), expression=expression, duration=duration, ) ) else: print(f"[Fehler] ElevenLabs hat die Datei nicht erstellt! Überspringe Playback für: {text_chunk}") elif item_type == "final_responses": all_final_responses = item final_text = full_assistant_text.strip() if final_text: print(f"[turn] full text response: {final_text!r}") # Speichere die Anfrage der KI im Verlauf _persist_assistant_turn(messages, all_final_responses, final_text) # Wenn keine Tools gerufen wurden, sind wir fertig und brechen den Loop ab. if not all_final_responses: break # 🛠️ Tools wurden gerufen: Wir führen sie jetzt aus new_tool_data_added = False for tc in all_final_responses: tool_name = tc["function"]["name"] try: tool_args = json.loads(tc["function"]["arguments"]) except json.JSONDecodeError: tool_args = {} tool_type = get_metadata_field(tool_name, "tool_type") # Fall 1: Animationen (need_sync) -> Wir werfen sie in den Playback-Worker if tool_type == "need_sync": playback.enqueue( PlaybackItem( item_type=ItemType.FUNCTION_CALL, content=tool_name, arguments=tool_args, needs_sync=True, ) ) # Fake-Erfolgsmeldung für die KI (da Animationen kein echtes Text-Ergebnis haben) messages.append({ "role": "tool", "tool_call_id": tc["id"], "name": tool_name, "content": json.dumps({"status": "action executed successfully"}) }) new_tool_data_added = True # Fall 2: Datenabfragen (can_async) -> Wir holen die Daten SOFORT! else: print(f"[turn] executing DATA tool immediately: {tool_name}") tool_args["manual_call"] = True result = call_mcp_tool(tool_name, tool_args) # Rohdaten aus MCP sauber extrahieren result_text = str(result) if isinstance(result, dict) and "content" in result: try: if isinstance(result["content"], list): result_text = result["content"][0].get("text", str(result)) else: result_text = result.get("content", str(result)) except Exception: pass print(f"[turn] Data received, sending back to LLM...") # Echtes Ergebnis an die KI zurückgeben messages.append({ "role": "tool", "tool_call_id": tc["id"], "name": tool_name, "content": result_text }) new_tool_data_added = True # Wenn neue Daten da sind, geht der While-Loop in die nächste Runde und die KI spricht! if new_tool_data_added: print("[turn] Looping back to LLM for final response...") continue # Wenn alles fertig ist, speichern wir den finalen Verlauf save_history(messages) print("[turn] history saved") # ==================== Click Dispatcher (Background) ==================== def click_dispatcher(playback: OrderedPlaybackWorker, mcp_on: bool, poll_interval: float = 0.5): while True: try: actions = fetch_pending_user_actions() if actions: user_text = " ".join(_action_to_text(a) for a in actions) print(f"[click_dispatcher] firing turn for {len(actions)} action(s): {user_text}") run_llm_turn(user_text, playback, mcp_on) except Exception as e: pass time.sleep(poll_interval) # ==================== Main Loop ==================== def main_loop(): ensure_dirs() mcp_on = is_mcp_available() print(f"\n========= Starting main_chat_v9 (MCP={'ON' if mcp_on else 'OFF'}) =========\n") playback = OrderedPlaybackWorker() playback.start() Thread(target=click_dispatcher, args=(playback, mcp_on), daemon=True).start() while True: try: print("\n[main] waiting for playback queue to finish...") playback.wait_until_finished() print("[main] queue finished - ready for input") _safe_set_state("idle") print("[main] recording - speak when ready") conversation_recording = Path("audio") / "conversation.wav" conversation_recording.parent.mkdir(parents=True, exist_ok=True) recording_path = str(conversation_recording) record_on_speech( output_file=recording_path, samplerate=48000, channels=1, silence_threshold=0.22, silence_duration=2, device="default", ) _safe_set_state("thinking") user_spoken_text = transcribe_audio_groq(aud_path=recording_path) print(f"[main] transcribed: {user_spoken_text!r}") if not user_spoken_text or not user_spoken_text.strip(): print("[main] empty transcription, skipping turn") continue run_llm_turn(user_spoken_text, playback, mcp_on) time.sleep(0.1) except KeyboardInterrupt: print("\n[main] interrupted, stopping playback") playback.stop() break except Exception as e: print(f"[main] error in main loop: {e}") import traceback traceback.print_exc() time.sleep(1) if __name__ == "__main__": main_loop()