import os import json import yaml import requests import datetime import os.path from pathlib import Path from dotenv import load_dotenv from fastmcp import FastMCP from fastmcp.server.auth.providers.jwt import RSAKeyPair from fastmcp.server.auth import JWTVerifier from pyngrok import ngrok from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build # ───────────────────────────── # 🔧 Environment & Config # ───────────────────────────── load_dotenv() SCOPES = [ 'https://www.googleapis.com/auth/calendar.readonly', # Kalender lesen 'https://www.googleapis.com/auth/tasks' # To-Dos lesen und schreiben ] current_file = Path(__file__).resolve() project_root = current_file.parent # Die Datei ist jetzt selbst im Root! server_name = "hakuro_MCP" config_yaml_path = project_root / "mcp_config.yaml" config_json_path = project_root / "mcp_config.json" with open(config_yaml_path, "r") as f: config_yaml = yaml.safe_load(f) port = config_yaml.get("port", 9001) # Nutze idealerweise einen anderen Port als der DiceCaller (z.B. 9001) use_ngrok = config_yaml.get("ngrok", True) # ───────────────────────────── # 🔐 Google auth Funktion # ───────────────────────────── def get_google_service(service_name, version): """Kümmert sich vollautomatisch um den Google-Login via OAuth2.""" creds = None # Die Datei token.json speichert deine Zugriffsrechte nach dem ersten Login if os.path.exists('token.json'): creds = Credentials.from_authorized_user_file('token.json', SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES) creds = flow.run_local_server(port=0, open_browser=False) with open('token.json', 'w') as token: token.write(creds.to_json()) return build(service_name, version, credentials=creds) # ───────────────────────────── # 🔐 Authentication & Ngrok # ───────────────────────────── key_pair = RSAKeyPair.generate() access_token = key_pair.create_token(audience=server_name) auth = JWTVerifier( public_key=key_pair.public_key, audience=server_name, ) if use_ngrok: ngrok_token = os.getenv("NGROK_AUTHTOKEN") if ngrok_token: ngrok.set_auth_token(ngrok_token) public_url = ngrok.connect(port, "http").public_url else: public_url = f"http://localhost:{port}" print(f"🌍 MCP Public URL: {public_url}") config_data = { "server_name": server_name, "url": public_url, "token": access_token, } with open(config_json_path, "w") as f: json.dump(config_data, f, indent=2) print(f"✅ Config saved to {config_json_path}") # ───────────────────────────── # 🧮 MCP Server Setup # ───────────────────────────── mcp = FastMCP(name=server_name) # Das ist dein neues Tool für die KI! @mcp.tool(meta={"tool_type": "need_sync"}, exclude_args=["manual_call"]) def perform_action(action_name: str, manual_call: bool = False): """ Use this tool to perform physical actions or animations with your virtual body. Allowed values for action_name are: "wave", "headpat", "stop", "look_left", "look_right", "woah". Call this tool exactly when you want to physically do the action in the real world. """ print(f"🎬 Animation getriggert: {action_name}") if manual_call: print(f"[Playback] Aktion '{action_name}' wird asynchron im Playback-Worker ausgeführt.") # Dein Mapping basierend auf deinem Ordnerinhalt animation_map = { "wave": "animations/vrma/VRMA_02.vrma", # (aus deinem curl-Beispiel) "headpat": "animations/vrma/headpat_cover.vrma", "stop": "animations/vrma/stop_it.vrma", "look_left": "animations/vrma/lookleft.vrma", "look_right": "animations/vrma/lookright.vrma", "woah": "animations/vrma/woah.vrma" } # Fallback, falls die KI etwas erfindet, was nicht im Mapping steht file_path = animation_map.get(action_name, f"animations/vrma/{action_name}.vrma") try: payload = { "animate_type": "auto", "animation_url": file_path, "play_once": True } response = requests.post("http://localhost:8001/animate", json=payload) response.raise_for_status() return f"Aktion '{action_name}' erfolgreich an den Avatar-Server gesendet!" except Exception as e: return f"Fehler beim Senden der Aktion '{action_name}': {e}" # ========================================== # 📅 TOOL 1: Google Kalender auslesen # ========================================== @mcp.tool(meta={"tool_type": "can_async"}, exclude_args=["manual_call"]) def get_calendar_events(max_results: int = 5, manual_call: bool = False): """ Use this tool to check the user's upcoming calendar events and appointments. Returns a list of the next upcoming events. """ # ... (der restliche Code deiner Funktion bleibt absolut gleich) """ Use this tool to check the user's upcoming calendar events and appointments. Returns a list of the next upcoming events. """ try: service = get_google_service('calendar', 'v3') now = datetime.datetime.utcnow().isoformat() + 'Z' # 'Z' bedeutet UTC events_result = service.events().list( calendarId='primary', timeMin=now, maxResults=max_results, singleEvents=True, orderBy='startTime' ).execute() events = events_result.get('items', []) if not events: return "Du hast keine anstehenden Termine im Kalender." output = "Deine nächsten Termine:\n" for event in events: start = event['start'].get('dateTime', event['start'].get('date')) output += f"- {start}: {event['summary']}\n" return output except Exception as e: return f"Fehler beim Laden des Kalenders: {e}" # ========================================== # 📝 TOOL 2: Neues To-Do (Task) erstellen # ========================================== @mcp.tool(meta={"tool_type": "can_async"}, exclude_args=["manual_call"]) def create_todo_task(title: str, notes: str = "", manual_call: bool = False): """ Use this tool to add a new task or to-do to the user's to-do list. Arguments: - title: The main summary of the task (e.g., 'Buy milk') - notes: Optional extra details or description for the task """ # ... (der restliche Code deiner Funktion bleibt absolut gleich) """ Use this tool to add a new task or to-do to the user's to-do list. Arguments: - title: The main summary of the task (e.g., 'Buy milk') - notes: Optional extra details or description for the task """ try: service = get_google_service('tasks', 'v1') # Erstellt die Aufgabe in der Standard-Liste (@default) task_data = { 'title': title, 'notes': notes } result = service.tasks().insert(tasklist='@default', body=task_data).execute() return f"Erfolgreich To-Do erstellt: '{result['title']}'" except Exception as e: return f"Fehler beim Erstellen des To-Dos: {e}" import subprocess # ========================================== # 🩺 TOOL 3: Homelab Ping-Check # ========================================== @mcp.tool(meta={"tool_type": "can_async"}, exclude_args=["manual_call"]) def check_homelab_status(manual_call: bool = False): """ Use this tool to check if the user's homelab servers, Proxmox nodes, and VMs are online. Call this when the user asks about their servers, infrastructure, or homelab status. """ # Deine Infrastruktur (ausschließlich ansible_host IPs) infrastructure = { "Proxmox 1 (pve)": "192.168.51.2", "Proxmox 2 (pve-p50)": "192.168.51.3", "Element Chat": "192.168.140.2", "Docker Server": "192.168.140.3", "Monitoring": "192.168.145.2", "Access": "192.168.150.2", "Home Assistant": "192.168.140.10", "Arr-Stack": "192.168.20.6", "Jellyfin": "192.168.20.7", "Homarr": "192.168.20.8", "Nginx Proxy": "192.168.89.157", "Wazuh": "192.168.145.4", "Cloud: ElementCall": "76.13.11.100", "Cloud: OpenClaw": "76.13.154.77" } status_report = ["🎛️ Homelab Server Status (Ping):"] for name, ip in infrastructure.items(): # Linux Ping-Befehl: -c 1 (1 Paket), -W 1 (1 Sekunde Timeout) result = subprocess.run( ['ping', '-c', '1', '-W', '1', ip], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) if result.returncode == 0: status_report.append(f"🟢 {name}: ONLINE") else: status_report.append(f"🔴 {name}: OFFLINE") return "\n".join(status_report) # ───────────────────────────── # ▶️ Run Server # ───────────────────────────── if __name__ == "__main__": print(f"\n---\n🔑 {server_name} Access Token:\n{access_token}\n---\n") mcp.run(transport="http", port=port, stateless_http=True)